risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26    LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46    pub level: MetricLevel,
47
48    // Executor metrics (disabled by default)
49    pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51    // Profiling Metrics:
52    // Aggregated per operator rather than per actor.
53    // These are purely in-memory, never collected by prometheus.
54    pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55    pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57    // Streaming actor metrics from tokio (disabled by default)
58    actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59    actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60    actor_poll_duration: RelabeledGuardedIntCounterVec,
61    actor_poll_cnt: RelabeledGuardedIntCounterVec,
62    actor_idle_duration: RelabeledGuardedIntCounterVec,
63    actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65    // Streaming actor
66    pub actor_count: LabelGuardedIntGaugeVec,
67    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
70
71    // Source
72    pub source_output_row_count: LabelGuardedIntCounterVec,
73    pub source_split_change_count: LabelGuardedIntCounterVec,
74    pub source_backfill_row_count: LabelGuardedIntCounterVec,
75
76    // Sink
77    sink_input_row_count: LabelGuardedIntCounterVec,
78    sink_input_bytes: LabelGuardedIntCounterVec,
79    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
80
81    // Exchange (see also `compute::ExchangeServiceMetrics`)
82    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
83
84    // Streaming Merge (We breakout this metric from `barrier_align_duration` because
85    // the alignment happens on different levels)
86    pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
87
88    // Backpressure
89    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90    actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91
92    // Streaming Join
93    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
94    pub join_lookup_total_count: LabelGuardedIntCounterVec,
95    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
96    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
97    pub join_match_duration_ns: LabelGuardedIntCounterVec,
98    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
99    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
100
101    // Streaming Join, Streaming Dynamic Filter and Streaming Union
102    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
103
104    // Streaming Aggregation
105    agg_lookup_miss_count: LabelGuardedIntCounterVec,
106    agg_total_lookup_count: LabelGuardedIntCounterVec,
107    agg_cached_entry_count: LabelGuardedIntGaugeVec,
108    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
109    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
110    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
111    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
112    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
113    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
114    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
115    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
116    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
117
118    // Streaming TopN
119    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
120    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
121    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
122    // TODO(rc): why not just use the above three?
123    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
126
127    // Lookup executor
128    lookup_cache_miss_count: LabelGuardedIntCounterVec,
129    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
130    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
131
132    // temporal join
133    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
134    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
135    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
136
137    // Backfill
138    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
139    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
140
141    // CDC Backfill
142    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // Snapshot Backfill
146    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
147
148    // Over Window
149    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
150    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
151    over_window_cache_miss_count: LabelGuardedIntCounterVec,
152    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
153    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
154    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
155    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
156    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
157    over_window_compute_count: LabelGuardedIntCounterVec,
158    over_window_same_output_count: LabelGuardedIntCounterVec,
159
160    /// The duration from receipt of barrier to all actors collection.
161    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
162    /// to flow through the graph.
163    pub barrier_inflight_latency: Histogram,
164    /// The duration of sync to storage.
165    pub barrier_sync_latency: Histogram,
166    pub barrier_batch_size: Histogram,
167    /// The progress made by the earliest in-flight barriers in the local barrier manager.
168    pub barrier_manager_progress: IntCounter,
169
170    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
171    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
172    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
173    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
174    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
177    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
178    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
179    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
180
181    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
182    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
183    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
184    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
185    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
189    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
190    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
191    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
192
193    // Memory management
194    pub lru_runtime_loop_count: IntCounter,
195    pub lru_latest_sequence: IntGauge,
196    pub lru_watermark_sequence: IntGauge,
197    pub lru_eviction_policy: IntGauge,
198    pub jemalloc_allocated_bytes: IntGauge,
199    pub jemalloc_active_bytes: IntGauge,
200    pub jemalloc_resident_bytes: IntGauge,
201    pub jemalloc_metadata_bytes: IntGauge,
202    pub jvm_allocated_bytes: IntGauge,
203    pub jvm_active_bytes: IntGauge,
204    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
205
206    // Materialized view
207    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
208    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
209    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
210    materialize_input_row_count: RelabeledGuardedIntCounterVec,
211    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
212
213    // PostgreSQL CDC LSN monitoring
214    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
215    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
216
217    // MySQL CDC binlog monitoring
218    pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
219    pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
220
221    // Gap Fill
222    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223
224    // State Table
225    pub state_table_iter_count: RelabeledGuardedIntCounterVec,
226    pub state_table_get_count: RelabeledGuardedIntCounterVec,
227    pub state_table_iter_vnode_pruned_count: RelabeledGuardedIntCounterVec,
228    pub state_table_get_vnode_pruned_count: RelabeledGuardedIntCounterVec,
229}
230
231pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
232
233pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
234    GLOBAL_STREAMING_METRICS
235        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
236        .clone()
237}
238
239impl StreamingMetrics {
240    fn new(registry: &Registry, level: MetricLevel) -> Self {
241        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
242            "stream_executor_row_count",
243            "Total number of rows that have been output from each executor",
244            &["actor_id", "fragment_id", "executor_identity"],
245            registry
246        )
247        .unwrap()
248        .relabel_debug_1(level);
249
250        let stream_node_output_row_count = CountMap::new();
251        let stream_node_output_blocking_duration_ns = CountMap::new();
252
253        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
254            "stream_source_output_rows_counts",
255            "Total number of rows that have been output from source",
256            &["source_id", "source_name", "actor_id", "fragment_id"],
257            registry
258        )
259        .unwrap();
260
261        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
262            "stream_source_split_change_event_count",
263            "Total number of split change events that have been operated by source",
264            &["source_id", "source_name", "actor_id", "fragment_id"],
265            registry
266        )
267        .unwrap();
268
269        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
270            "stream_source_backfill_rows_counts",
271            "Total number of rows that have been backfilled for source",
272            &["source_id", "source_name", "actor_id", "fragment_id"],
273            registry
274        )
275        .unwrap();
276
277        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
278            "stream_sink_input_row_count",
279            "Total number of rows streamed into sink executors",
280            &["sink_id", "actor_id", "fragment_id"],
281            registry
282        )
283        .unwrap();
284
285        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
286            "stream_sink_input_bytes",
287            "Total size of chunks streamed into sink executors",
288            &["sink_id", "actor_id", "fragment_id"],
289            registry
290        )
291        .unwrap();
292
293        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
294            "stream_mview_input_row_count",
295            "Total number of rows streamed into materialize executors",
296            &["actor_id", "table_id", "fragment_id"],
297            registry
298        )
299        .unwrap()
300        .relabel_debug_1(level);
301
302        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
303            "stream_mview_current_epoch",
304            "The current epoch of the materialized executor",
305            &["actor_id", "table_id", "fragment_id"],
306            registry
307        )
308        .unwrap()
309        .relabel_debug_1(level);
310
311        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
312            "stream_pg_cdc_state_table_lsn",
313            "Current LSN value stored in PostgreSQL CDC state table",
314            &["source_id"],
315            registry,
316        )
317        .unwrap();
318
319        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
320            "stream_pg_cdc_jni_commit_offset_lsn",
321            "LSN value when JNI commit offset is called for PostgreSQL CDC",
322            &["source_id"],
323            registry,
324        )
325        .unwrap();
326
327        let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
328            "stream_mysql_cdc_state_binlog_file_seq",
329            "Current binlog file sequence number stored in MySQL CDC state table",
330            &["source_id"],
331            registry,
332        )
333        .unwrap();
334
335        let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
336            "stream_mysql_cdc_state_binlog_position",
337            "Current binlog position stored in MySQL CDC state table",
338            &["source_id"],
339            registry,
340        )
341        .unwrap();
342
343        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
344            "stream_sink_chunk_buffer_size",
345            "Total size of chunks buffered in a barrier",
346            &["sink_id", "actor_id", "fragment_id"],
347            registry
348        )
349        .unwrap();
350        let actor_output_buffer_blocking_duration_ns =
351            register_guarded_int_counter_vec_with_registry!(
352                "stream_actor_output_buffer_blocking_duration_ns",
353                "Total blocking duration (ns) of output buffer",
354                &["actor_id", "fragment_id", "downstream_fragment_id"],
355                registry
356            )
357            .unwrap()
358            // mask the first label `actor_id` if the level is less verbose than `Debug`
359            .relabel_debug_1(level);
360
361        let actor_input_buffer_blocking_duration_ns =
362            register_guarded_int_counter_vec_with_registry!(
363                "stream_actor_input_buffer_blocking_duration_ns",
364                "Total blocking duration (ns) of input buffer",
365                &["actor_id", "fragment_id", "upstream_fragment_id"],
366                registry
367            )
368            .unwrap()
369            // mask the first label `actor_id` if the level is less verbose than `Debug`
370            .relabel_debug_1(level);
371
372        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
373            "stream_exchange_frag_recv_size",
374            "Total size of messages that have been received from upstream Fragment",
375            &["up_fragment_id", "down_fragment_id"],
376            registry
377        )
378        .unwrap();
379
380        let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
381            "stream_actor_poll_duration",
382            "tokio's metrics",
383            &["actor_id", "fragment_id"],
384            registry
385        )
386        .unwrap()
387        .relabel_debug_1(level);
388
389        let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
390            "stream_actor_poll_cnt",
391            "tokio's metrics",
392            &["actor_id", "fragment_id"],
393            registry
394        )
395        .unwrap()
396        .relabel_debug_1(level);
397
398        let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
399            "stream_actor_scheduled_duration",
400            "tokio's metrics",
401            &["actor_id", "fragment_id"],
402            registry
403        )
404        .unwrap()
405        .relabel_debug_1(level);
406
407        let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
408            "stream_actor_scheduled_cnt",
409            "tokio's metrics",
410            &["actor_id", "fragment_id"],
411            registry
412        )
413        .unwrap()
414        .relabel_debug_1(level);
415
416        let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
417            "stream_actor_idle_duration",
418            "tokio's metrics",
419            &["actor_id", "fragment_id"],
420            registry
421        )
422        .unwrap()
423        .relabel_debug_1(level);
424
425        let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
426            "stream_actor_idle_cnt",
427            "tokio's metrics",
428            &["actor_id", "fragment_id"],
429            registry
430        )
431        .unwrap()
432        .relabel_debug_1(level);
433
434        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
435            "stream_actor_in_record_cnt",
436            "Total number of rows actor received",
437            &["actor_id", "fragment_id", "upstream_fragment_id"],
438            registry
439        )
440        .unwrap()
441        .relabel_debug_1(level);
442
443        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
444            "stream_actor_out_record_cnt",
445            "Total number of rows actor sent",
446            &["actor_id", "fragment_id"],
447            registry
448        )
449        .unwrap()
450        .relabel_debug_1(level);
451
452        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
453            "stream_actor_current_epoch",
454            "Current epoch of actor",
455            &["actor_id", "fragment_id"],
456            registry
457        )
458        .unwrap()
459        .relabel_debug_1(level);
460
461        let actor_count = register_guarded_int_gauge_vec_with_registry!(
462            "stream_actor_count",
463            "Total number of actors (parallelism)",
464            &["fragment_id"],
465            registry
466        )
467        .unwrap();
468
469        let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
470            "stream_merge_barrier_align_duration_ns",
471            "Total merge barrier alignment duration (ns)",
472            &["actor_id", "fragment_id"],
473            registry
474        )
475        .unwrap()
476        .relabel_debug_1(level);
477
478        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
479            "stream_join_lookup_miss_count",
480            "Join executor lookup miss duration",
481            &["side", "join_table_id", "actor_id", "fragment_id"],
482            registry
483        )
484        .unwrap();
485
486        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
487            "stream_join_lookup_total_count",
488            "Join executor lookup total operation",
489            &["side", "join_table_id", "actor_id", "fragment_id"],
490            registry
491        )
492        .unwrap();
493
494        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
495            "stream_join_insert_cache_miss_count",
496            "Join executor cache miss when insert operation",
497            &["side", "join_table_id", "actor_id", "fragment_id"],
498            registry
499        )
500        .unwrap();
501
502        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
503            "stream_join_actor_input_waiting_duration_ns",
504            "Total waiting duration (ns) of input buffer of join actor",
505            &["actor_id", "fragment_id"],
506            registry
507        )
508        .unwrap();
509
510        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
511            "stream_join_match_duration_ns",
512            "Matching duration for each side",
513            &["actor_id", "fragment_id", "side"],
514            registry
515        )
516        .unwrap();
517
518        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
519            "stream_barrier_align_duration_ns",
520            "Duration of join align barrier",
521            &["actor_id", "fragment_id", "wait_side", "executor"],
522            registry
523        )
524        .unwrap()
525        .relabel_debug_1(level);
526
527        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
528            "stream_join_cached_entry_count",
529            "Number of cached entries in streaming join operators",
530            &["actor_id", "fragment_id", "side"],
531            registry
532        )
533        .unwrap();
534
535        let join_matched_join_keys_opts = histogram_opts!(
536            "stream_join_matched_join_keys",
537            "The number of keys matched in the opposite side",
538            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
539        );
540
541        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
542            join_matched_join_keys_opts,
543            &["actor_id", "fragment_id", "table_id"],
544            registry
545        )
546        .unwrap()
547        .relabel_debug_1(level);
548
549        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
550            "stream_agg_lookup_miss_count",
551            "Aggregation executor lookup miss duration",
552            &["table_id", "actor_id", "fragment_id"],
553            registry
554        )
555        .unwrap();
556
557        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
558            "stream_agg_lookup_total_count",
559            "Aggregation executor lookup total operation",
560            &["table_id", "actor_id", "fragment_id"],
561            registry
562        )
563        .unwrap();
564
565        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
566            "stream_agg_distinct_cache_miss_count",
567            "Aggregation executor dinsinct miss duration",
568            &["table_id", "actor_id", "fragment_id"],
569            registry
570        )
571        .unwrap();
572
573        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
574            "stream_agg_distinct_total_cache_count",
575            "Aggregation executor distinct total operation",
576            &["table_id", "actor_id", "fragment_id"],
577            registry
578        )
579        .unwrap();
580
581        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
582            "stream_agg_distinct_cached_entry_count",
583            "Total entry counts in distinct aggregation executor cache",
584            &["table_id", "actor_id", "fragment_id"],
585            registry
586        )
587        .unwrap();
588
589        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
590            "stream_agg_dirty_groups_count",
591            "Total dirty group counts in aggregation executor",
592            &["table_id", "actor_id", "fragment_id"],
593            registry
594        )
595        .unwrap();
596
597        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
598            "stream_agg_dirty_groups_heap_size",
599            "Total dirty group heap size in aggregation executor",
600            &["table_id", "actor_id", "fragment_id"],
601            registry
602        )
603        .unwrap();
604
605        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
606            "stream_agg_state_cache_lookup_count",
607            "Aggregation executor state cache lookup count",
608            &["table_id", "actor_id", "fragment_id"],
609            registry
610        )
611        .unwrap();
612
613        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
614            "stream_agg_state_cache_miss_count",
615            "Aggregation executor state cache miss count",
616            &["table_id", "actor_id", "fragment_id"],
617            registry
618        )
619        .unwrap();
620
621        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
622            "stream_group_top_n_cache_miss_count",
623            "Group top n executor cache miss count",
624            &["table_id", "actor_id", "fragment_id"],
625            registry
626        )
627        .unwrap();
628
629        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
630            "stream_group_top_n_total_query_cache_count",
631            "Group top n executor query cache total count",
632            &["table_id", "actor_id", "fragment_id"],
633            registry
634        )
635        .unwrap();
636
637        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
638            "stream_group_top_n_cached_entry_count",
639            "Total entry counts in group top n executor cache",
640            &["table_id", "actor_id", "fragment_id"],
641            registry
642        )
643        .unwrap();
644
645        let group_top_n_appendonly_cache_miss_count =
646            register_guarded_int_counter_vec_with_registry!(
647                "stream_group_top_n_appendonly_cache_miss_count",
648                "Group top n appendonly executor cache miss count",
649                &["table_id", "actor_id", "fragment_id"],
650                registry
651            )
652            .unwrap();
653
654        let group_top_n_appendonly_total_query_cache_count =
655            register_guarded_int_counter_vec_with_registry!(
656                "stream_group_top_n_appendonly_total_query_cache_count",
657                "Group top n appendonly executor total cache count",
658                &["table_id", "actor_id", "fragment_id"],
659                registry
660            )
661            .unwrap();
662
663        let group_top_n_appendonly_cached_entry_count =
664            register_guarded_int_gauge_vec_with_registry!(
665                "stream_group_top_n_appendonly_cached_entry_count",
666                "Total entry counts in group top n appendonly executor cache",
667                &["table_id", "actor_id", "fragment_id"],
668                registry
669            )
670            .unwrap();
671
672        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
673            "stream_lookup_cache_miss_count",
674            "Lookup executor cache miss count",
675            &["table_id", "actor_id", "fragment_id"],
676            registry
677        )
678        .unwrap();
679
680        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
681            "stream_lookup_total_query_cache_count",
682            "Lookup executor query cache total count",
683            &["table_id", "actor_id", "fragment_id"],
684            registry
685        )
686        .unwrap();
687
688        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
689            "stream_lookup_cached_entry_count",
690            "Total entry counts in lookup executor cache",
691            &["table_id", "actor_id", "fragment_id"],
692            registry
693        )
694        .unwrap();
695
696        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
697            "stream_temporal_join_cache_miss_count",
698            "Temporal join executor cache miss count",
699            &["table_id", "actor_id", "fragment_id"],
700            registry
701        )
702        .unwrap();
703
704        let temporal_join_total_query_cache_count =
705            register_guarded_int_counter_vec_with_registry!(
706                "stream_temporal_join_total_query_cache_count",
707                "Temporal join executor query cache total count",
708                &["table_id", "actor_id", "fragment_id"],
709                registry
710            )
711            .unwrap();
712
713        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
714            "stream_temporal_join_cached_entry_count",
715            "Total entry count in temporal join executor cache",
716            &["table_id", "actor_id", "fragment_id"],
717            registry
718        )
719        .unwrap();
720
721        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
722            "stream_agg_cached_entry_count",
723            "Number of cached keys in streaming aggregation operators",
724            &["table_id", "actor_id", "fragment_id"],
725            registry
726        )
727        .unwrap();
728
729        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
730            "stream_agg_chunk_lookup_miss_count",
731            "Aggregation executor chunk-level lookup miss duration",
732            &["table_id", "actor_id", "fragment_id"],
733            registry
734        )
735        .unwrap();
736
737        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
738            "stream_agg_chunk_lookup_total_count",
739            "Aggregation executor chunk-level lookup total operation",
740            &["table_id", "actor_id", "fragment_id"],
741            registry
742        )
743        .unwrap();
744
745        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
746            "stream_backfill_snapshot_read_row_count",
747            "Total number of rows that have been read from the backfill snapshot",
748            &["table_id", "actor_id"],
749            registry
750        )
751        .unwrap();
752
753        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
754            "stream_backfill_upstream_output_row_count",
755            "Total number of rows that have been output from the backfill upstream",
756            &["table_id", "actor_id"],
757            registry
758        )
759        .unwrap();
760
761        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
762            "stream_cdc_backfill_snapshot_read_row_count",
763            "Total number of rows that have been read from the cdc_backfill snapshot",
764            &["table_id", "actor_id"],
765            registry
766        )
767        .unwrap();
768
769        let cdc_backfill_upstream_output_row_count =
770            register_guarded_int_counter_vec_with_registry!(
771                "stream_cdc_backfill_upstream_output_row_count",
772                "Total number of rows that have been output from the cdc_backfill upstream",
773                &["table_id", "actor_id"],
774                registry
775            )
776            .unwrap();
777
778        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
779            "stream_snapshot_backfill_consume_snapshot_row_count",
780            "Total number of rows that have been output from snapshot backfill",
781            &["table_id", "actor_id", "stage"],
782            registry
783        )
784        .unwrap();
785
786        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
787            "stream_over_window_cached_entry_count",
788            "Total entry (partition) count in over window executor cache",
789            &["table_id", "actor_id", "fragment_id"],
790            registry
791        )
792        .unwrap();
793
794        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
795            "stream_over_window_cache_lookup_count",
796            "Over window executor cache lookup count",
797            &["table_id", "actor_id", "fragment_id"],
798            registry
799        )
800        .unwrap();
801
802        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
803            "stream_over_window_cache_miss_count",
804            "Over window executor cache miss count",
805            &["table_id", "actor_id", "fragment_id"],
806            registry
807        )
808        .unwrap();
809
810        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
811            "stream_over_window_range_cache_entry_count",
812            "Over window partition range cache entry count",
813            &["table_id", "actor_id", "fragment_id"],
814            registry,
815        )
816        .unwrap();
817
818        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
819            "stream_over_window_range_cache_lookup_count",
820            "Over window partition range cache lookup count",
821            &["table_id", "actor_id", "fragment_id"],
822            registry
823        )
824        .unwrap();
825
826        let over_window_range_cache_left_miss_count =
827            register_guarded_int_counter_vec_with_registry!(
828                "stream_over_window_range_cache_left_miss_count",
829                "Over window partition range cache left miss count",
830                &["table_id", "actor_id", "fragment_id"],
831                registry
832            )
833            .unwrap();
834
835        let over_window_range_cache_right_miss_count =
836            register_guarded_int_counter_vec_with_registry!(
837                "stream_over_window_range_cache_right_miss_count",
838                "Over window partition range cache right miss count",
839                &["table_id", "actor_id", "fragment_id"],
840                registry
841            )
842            .unwrap();
843
844        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
845            "stream_over_window_accessed_entry_count",
846            "Over window accessed entry count",
847            &["table_id", "actor_id", "fragment_id"],
848            registry
849        )
850        .unwrap();
851
852        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
853            "stream_over_window_compute_count",
854            "Over window compute count",
855            &["table_id", "actor_id", "fragment_id"],
856            registry
857        )
858        .unwrap();
859
860        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
861            "stream_over_window_same_output_count",
862            "Over window same output count",
863            &["table_id", "actor_id", "fragment_id"],
864            registry
865        )
866        .unwrap();
867
868        let opts = histogram_opts!(
869            "stream_barrier_inflight_duration_seconds",
870            "barrier_inflight_latency",
871            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
872        );
873        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
874
875        let opts = histogram_opts!(
876            "stream_barrier_sync_storage_duration_seconds",
877            "barrier_sync_latency",
878            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
879        );
880        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
881
882        let opts = histogram_opts!(
883            "stream_barrier_batch_size",
884            "barrier_batch_size",
885            exponential_buckets(1.0, 2.0, 8).unwrap()
886        );
887        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
888
889        let barrier_manager_progress = register_int_counter_with_registry!(
890            "stream_barrier_manager_progress",
891            "The number of actors that have processed the earliest in-flight barriers",
892            registry
893        )
894        .unwrap();
895
896        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
897            "sync_kv_log_store_wait_next_poll_ns",
898            "Total duration (ns) of waiting for next poll",
899            &["actor_id", "target", "fragment_id", "relation"],
900            registry
901        )
902        .unwrap();
903
904        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
905            "sync_kv_log_store_read_count",
906            "read row count throughput of sync_kv log store",
907            &["type", "actor_id", "target", "fragment_id", "relation"],
908            registry
909        )
910        .unwrap();
911
912        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
913            "sync_kv_log_store_read_size",
914            "read size throughput of sync_kv log store",
915            &["type", "actor_id", "target", "fragment_id", "relation"],
916            registry
917        )
918        .unwrap();
919
920        let sync_kv_log_store_write_pause_duration_ns =
921            register_guarded_int_counter_vec_with_registry!(
922                "sync_kv_log_store_write_pause_duration_ns",
923                "Duration (ns) of sync_kv log store write pause",
924                &["actor_id", "target", "fragment_id", "relation"],
925                registry
926            )
927            .unwrap();
928
929        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
930            "sync_kv_log_store_state",
931            "clean/unclean state transition for sync_kv log store",
932            &["state", "actor_id", "target", "fragment_id", "relation"],
933            registry
934        )
935        .unwrap();
936
937        let sync_kv_log_store_storage_write_count =
938            register_guarded_int_counter_vec_with_registry!(
939                "sync_kv_log_store_storage_write_count",
940                "Write row count throughput of sync_kv log store",
941                &["actor_id", "target", "fragment_id", "relation"],
942                registry
943            )
944            .unwrap();
945
946        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
947            "sync_kv_log_store_storage_write_size",
948            "Write size throughput of sync_kv log store",
949            &["actor_id", "target", "fragment_id", "relation"],
950            registry
951        )
952        .unwrap();
953
954        let sync_kv_log_store_buffer_unconsumed_item_count =
955            register_guarded_int_gauge_vec_with_registry!(
956                "sync_kv_log_store_buffer_unconsumed_item_count",
957                "Number of Unconsumed Item in buffer",
958                &["actor_id", "target", "fragment_id", "relation"],
959                registry
960            )
961            .unwrap();
962
963        let sync_kv_log_store_buffer_unconsumed_row_count =
964            register_guarded_int_gauge_vec_with_registry!(
965                "sync_kv_log_store_buffer_unconsumed_row_count",
966                "Number of Unconsumed Row in buffer",
967                &["actor_id", "target", "fragment_id", "relation"],
968                registry
969            )
970            .unwrap();
971
972        let sync_kv_log_store_buffer_unconsumed_epoch_count =
973            register_guarded_int_gauge_vec_with_registry!(
974                "sync_kv_log_store_buffer_unconsumed_epoch_count",
975                "Number of Unconsumed Epoch in buffer",
976                &["actor_id", "target", "fragment_id", "relation"],
977                registry
978            )
979            .unwrap();
980
981        let sync_kv_log_store_buffer_unconsumed_min_epoch =
982            register_guarded_int_gauge_vec_with_registry!(
983                "sync_kv_log_store_buffer_unconsumed_min_epoch",
984                "Number of Unconsumed Epoch in buffer",
985                &["actor_id", "target", "fragment_id", "relation"],
986                registry
987            )
988            .unwrap();
989
990        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
991            "kv_log_store_storage_write_count",
992            "Write row count throughput of kv log store",
993            &["actor_id", "connector", "sink_id", "sink_name"],
994            registry
995        )
996        .unwrap();
997
998        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
999            "kv_log_store_storage_write_size",
1000            "Write size throughput of kv log store",
1001            &["actor_id", "connector", "sink_id", "sink_name"],
1002            registry
1003        )
1004        .unwrap();
1005
1006        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1007            "kv_log_store_storage_read_count",
1008            "Write row count throughput of kv log store",
1009            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1010            registry
1011        )
1012        .unwrap();
1013
1014        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1015            "kv_log_store_storage_read_size",
1016            "Write size throughput of kv log store",
1017            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1018            registry
1019        )
1020        .unwrap();
1021
1022        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1023            "kv_log_store_rewind_count",
1024            "Kv log store rewind rate",
1025            &["actor_id", "connector", "sink_id", "sink_name"],
1026            registry
1027        )
1028        .unwrap();
1029
1030        let kv_log_store_rewind_delay_opts = {
1031            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1032            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1033                - REWIND_BASE_DELAY.as_secs_f64().log2())
1034            .ceil() as usize;
1035            let buckets = exponential_buckets(
1036                REWIND_BASE_DELAY.as_secs_f64(),
1037                REWIND_BACKOFF_FACTOR as _,
1038                bucket_count,
1039            )
1040            .unwrap();
1041            histogram_opts!(
1042                "kv_log_store_rewind_delay",
1043                "Kv log store rewind delay",
1044                buckets,
1045            )
1046        };
1047
1048        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1049            kv_log_store_rewind_delay_opts,
1050            &["actor_id", "connector", "sink_id", "sink_name"],
1051            registry
1052        )
1053        .unwrap();
1054
1055        let kv_log_store_buffer_unconsumed_item_count =
1056            register_guarded_int_gauge_vec_with_registry!(
1057                "kv_log_store_buffer_unconsumed_item_count",
1058                "Number of Unconsumed Item in buffer",
1059                &["actor_id", "connector", "sink_id", "sink_name"],
1060                registry
1061            )
1062            .unwrap();
1063
1064        let kv_log_store_buffer_unconsumed_row_count =
1065            register_guarded_int_gauge_vec_with_registry!(
1066                "kv_log_store_buffer_unconsumed_row_count",
1067                "Number of Unconsumed Row in buffer",
1068                &["actor_id", "connector", "sink_id", "sink_name"],
1069                registry
1070            )
1071            .unwrap();
1072
1073        let kv_log_store_buffer_unconsumed_epoch_count =
1074            register_guarded_int_gauge_vec_with_registry!(
1075                "kv_log_store_buffer_unconsumed_epoch_count",
1076                "Number of Unconsumed Epoch in buffer",
1077                &["actor_id", "connector", "sink_id", "sink_name"],
1078                registry
1079            )
1080            .unwrap();
1081
1082        let kv_log_store_buffer_unconsumed_min_epoch =
1083            register_guarded_int_gauge_vec_with_registry!(
1084                "kv_log_store_buffer_unconsumed_min_epoch",
1085                "Number of Unconsumed Epoch in buffer",
1086                &["actor_id", "connector", "sink_id", "sink_name"],
1087                registry
1088            )
1089            .unwrap();
1090
1091        let lru_runtime_loop_count = register_int_counter_with_registry!(
1092            "lru_runtime_loop_count",
1093            "The counts of the eviction loop in LRU manager per second",
1094            registry
1095        )
1096        .unwrap();
1097
1098        let lru_latest_sequence = register_int_gauge_with_registry!(
1099            "lru_latest_sequence",
1100            "Current LRU global sequence",
1101            registry,
1102        )
1103        .unwrap();
1104
1105        let lru_watermark_sequence = register_int_gauge_with_registry!(
1106            "lru_watermark_sequence",
1107            "Current LRU watermark sequence",
1108            registry,
1109        )
1110        .unwrap();
1111
1112        let lru_eviction_policy = register_int_gauge_with_registry!(
1113            "lru_eviction_policy",
1114            "Current LRU eviction policy",
1115            registry,
1116        )
1117        .unwrap();
1118
1119        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1120            "jemalloc_allocated_bytes",
1121            "The allocated memory jemalloc, got from jemalloc_ctl",
1122            registry
1123        )
1124        .unwrap();
1125
1126        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1127            "jemalloc_active_bytes",
1128            "The active memory jemalloc, got from jemalloc_ctl",
1129            registry
1130        )
1131        .unwrap();
1132
1133        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1134            "jemalloc_resident_bytes",
1135            "The active memory jemalloc, got from jemalloc_ctl",
1136            registry
1137        )
1138        .unwrap();
1139
1140        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1141            "jemalloc_metadata_bytes",
1142            "The active memory jemalloc, got from jemalloc_ctl",
1143            registry
1144        )
1145        .unwrap();
1146
1147        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1148            "jvm_allocated_bytes",
1149            "The allocated jvm memory",
1150            registry
1151        )
1152        .unwrap();
1153
1154        let jvm_active_bytes = register_int_gauge_with_registry!(
1155            "jvm_active_bytes",
1156            "The active jvm memory",
1157            registry
1158        )
1159        .unwrap();
1160
1161        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1162            "stream_materialize_cache_hit_count",
1163            "Materialize executor cache hit count",
1164            &["actor_id", "table_id", "fragment_id"],
1165            registry
1166        )
1167        .unwrap()
1168        .relabel_debug_1(level);
1169
1170        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1171            "stream_materialize_data_exist_count",
1172            "Materialize executor data exist count",
1173            &["actor_id", "table_id", "fragment_id"],
1174            registry
1175        )
1176        .unwrap()
1177        .relabel_debug_1(level);
1178
1179        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1180            "stream_materialize_cache_total_count",
1181            "Materialize executor cache total operation",
1182            &["actor_id", "table_id", "fragment_id"],
1183            registry
1184        )
1185        .unwrap()
1186        .relabel_debug_1(level);
1187
1188        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1189            "stream_memory_usage",
1190            "Memory usage for stream executors",
1191            &["actor_id", "table_id", "desc"],
1192            registry
1193        )
1194        .unwrap()
1195        .relabel_debug_1(level);
1196
1197        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1198            "gap_fill_generated_rows_count",
1199            "Total number of rows generated by gap fill executor",
1200            &["actor_id", "fragment_id"],
1201            registry
1202        )
1203        .unwrap()
1204        .relabel_debug_1(level);
1205
1206        let state_table_iter_count = register_guarded_int_counter_vec_with_registry!(
1207            "state_table_iter_count",
1208            "Total number of state table iter operations",
1209            &["actor_id", "fragment_id", "table_id"],
1210            registry
1211        )
1212        .unwrap()
1213        .relabel_debug_1(level);
1214
1215        let state_table_get_count = register_guarded_int_counter_vec_with_registry!(
1216            "state_table_get_count",
1217            "Total number of state table get operations",
1218            &["actor_id", "fragment_id", "table_id"],
1219            registry
1220        )
1221        .unwrap()
1222        .relabel_debug_1(level);
1223
1224        let state_table_iter_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1225            "state_table_iter_vnode_pruned_count",
1226            "Total number of state table iter operations pruned by vnode statistics",
1227            &["actor_id", "fragment_id", "table_id"],
1228            registry
1229        )
1230        .unwrap()
1231        .relabel_debug_1(level);
1232
1233        let state_table_get_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1234            "state_table_get_vnode_pruned_count",
1235            "Total number of state table get operations pruned by vnode statistics",
1236            &["actor_id", "fragment_id", "table_id"],
1237            registry
1238        )
1239        .unwrap()
1240        .relabel_debug_1(level);
1241
1242        Self {
1243            level,
1244            executor_row_count,
1245            mem_stream_node_output_row_count: stream_node_output_row_count,
1246            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1247            actor_scheduled_duration,
1248            actor_scheduled_cnt,
1249            actor_poll_duration,
1250            actor_poll_cnt,
1251            actor_idle_duration,
1252            actor_idle_cnt,
1253            actor_count,
1254            actor_in_record_cnt,
1255            actor_out_record_cnt,
1256            actor_current_epoch,
1257            source_output_row_count,
1258            source_split_change_count,
1259            source_backfill_row_count,
1260            sink_input_row_count,
1261            sink_input_bytes,
1262            sink_chunk_buffer_size,
1263            exchange_frag_recv_size,
1264            merge_barrier_align_duration,
1265            actor_output_buffer_blocking_duration_ns,
1266            actor_input_buffer_blocking_duration_ns,
1267            join_lookup_miss_count,
1268            join_lookup_total_count,
1269            join_insert_cache_miss_count,
1270            join_actor_input_waiting_duration_ns,
1271            join_match_duration_ns,
1272            join_cached_entry_count,
1273            join_matched_join_keys,
1274            barrier_align_duration,
1275            agg_lookup_miss_count,
1276            agg_total_lookup_count,
1277            agg_cached_entry_count,
1278            agg_chunk_lookup_miss_count,
1279            agg_chunk_total_lookup_count,
1280            agg_dirty_groups_count,
1281            agg_dirty_groups_heap_size,
1282            agg_distinct_cache_miss_count,
1283            agg_distinct_total_cache_count,
1284            agg_distinct_cached_entry_count,
1285            agg_state_cache_lookup_count,
1286            agg_state_cache_miss_count,
1287            group_top_n_cache_miss_count,
1288            group_top_n_total_query_cache_count,
1289            group_top_n_cached_entry_count,
1290            group_top_n_appendonly_cache_miss_count,
1291            group_top_n_appendonly_total_query_cache_count,
1292            group_top_n_appendonly_cached_entry_count,
1293            lookup_cache_miss_count,
1294            lookup_total_query_cache_count,
1295            lookup_cached_entry_count,
1296            temporal_join_cache_miss_count,
1297            temporal_join_total_query_cache_count,
1298            temporal_join_cached_entry_count,
1299            backfill_snapshot_read_row_count,
1300            backfill_upstream_output_row_count,
1301            cdc_backfill_snapshot_read_row_count,
1302            cdc_backfill_upstream_output_row_count,
1303            snapshot_backfill_consume_row_count,
1304            over_window_cached_entry_count,
1305            over_window_cache_lookup_count,
1306            over_window_cache_miss_count,
1307            over_window_range_cache_entry_count,
1308            over_window_range_cache_lookup_count,
1309            over_window_range_cache_left_miss_count,
1310            over_window_range_cache_right_miss_count,
1311            over_window_accessed_entry_count,
1312            over_window_compute_count,
1313            over_window_same_output_count,
1314            barrier_inflight_latency,
1315            barrier_sync_latency,
1316            barrier_batch_size,
1317            barrier_manager_progress,
1318            kv_log_store_storage_write_count,
1319            kv_log_store_storage_write_size,
1320            kv_log_store_rewind_count,
1321            kv_log_store_rewind_delay,
1322            kv_log_store_storage_read_count,
1323            kv_log_store_storage_read_size,
1324            kv_log_store_buffer_unconsumed_item_count,
1325            kv_log_store_buffer_unconsumed_row_count,
1326            kv_log_store_buffer_unconsumed_epoch_count,
1327            kv_log_store_buffer_unconsumed_min_epoch,
1328            sync_kv_log_store_read_count,
1329            sync_kv_log_store_read_size,
1330            sync_kv_log_store_write_pause_duration_ns,
1331            sync_kv_log_store_state,
1332            sync_kv_log_store_wait_next_poll_ns,
1333            sync_kv_log_store_storage_write_count,
1334            sync_kv_log_store_storage_write_size,
1335            sync_kv_log_store_buffer_unconsumed_item_count,
1336            sync_kv_log_store_buffer_unconsumed_row_count,
1337            sync_kv_log_store_buffer_unconsumed_epoch_count,
1338            sync_kv_log_store_buffer_unconsumed_min_epoch,
1339            lru_runtime_loop_count,
1340            lru_latest_sequence,
1341            lru_watermark_sequence,
1342            lru_eviction_policy,
1343            jemalloc_allocated_bytes,
1344            jemalloc_active_bytes,
1345            jemalloc_resident_bytes,
1346            jemalloc_metadata_bytes,
1347            jvm_allocated_bytes,
1348            jvm_active_bytes,
1349            stream_memory_usage,
1350            materialize_cache_hit_count,
1351            materialize_data_exist_count,
1352            materialize_cache_total_count,
1353            materialize_input_row_count,
1354            materialize_current_epoch,
1355            pg_cdc_state_table_lsn,
1356            pg_cdc_jni_commit_offset_lsn,
1357            mysql_cdc_state_binlog_file_seq,
1358            mysql_cdc_state_binlog_position,
1359            gap_fill_generated_rows_count,
1360            state_table_iter_count,
1361            state_table_get_count,
1362            state_table_iter_vnode_pruned_count,
1363            state_table_get_vnode_pruned_count,
1364        }
1365    }
1366
1367    /// Create a new `StreamingMetrics` instance used in tests or other places.
1368    pub fn unused() -> Self {
1369        global_streaming_metrics(MetricLevel::Disabled)
1370    }
1371
1372    pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1373        let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1374        let actor_scheduled_duration = self
1375            .actor_scheduled_duration
1376            .with_guarded_label_values(label_list);
1377        let actor_scheduled_cnt = self
1378            .actor_scheduled_cnt
1379            .with_guarded_label_values(label_list);
1380        let actor_poll_duration = self
1381            .actor_poll_duration
1382            .with_guarded_label_values(label_list);
1383        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1384        let actor_idle_duration = self
1385            .actor_idle_duration
1386            .with_guarded_label_values(label_list);
1387        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1388        ActorMetrics {
1389            actor_scheduled_duration,
1390            actor_scheduled_cnt,
1391            actor_poll_duration,
1392            actor_poll_cnt,
1393            actor_idle_duration,
1394            actor_idle_cnt,
1395        }
1396    }
1397
1398    pub(crate) fn new_actor_input_metrics(
1399        &self,
1400        actor_id: ActorId,
1401        fragment_id: FragmentId,
1402        upstream_fragment_id: FragmentId,
1403    ) -> ActorInputMetrics {
1404        let actor_id_str = actor_id.to_string();
1405        let fragment_id_str = fragment_id.to_string();
1406        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1407        ActorInputMetrics {
1408            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1409                &actor_id_str,
1410                &fragment_id_str,
1411                &upstream_fragment_id_str,
1412            ]),
1413            actor_input_buffer_blocking_duration_ns: self
1414                .actor_input_buffer_blocking_duration_ns
1415                .with_guarded_label_values(&[
1416                    &actor_id_str,
1417                    &fragment_id_str,
1418                    &upstream_fragment_id_str,
1419                ]),
1420        }
1421    }
1422
1423    pub fn new_sink_exec_metrics(
1424        &self,
1425        id: SinkId,
1426        actor_id: ActorId,
1427        fragment_id: FragmentId,
1428    ) -> SinkExecutorMetrics {
1429        let label_list: &[&str; 3] = &[
1430            &id.to_string(),
1431            &actor_id.to_string(),
1432            &fragment_id.to_string(),
1433        ];
1434        SinkExecutorMetrics {
1435            sink_input_row_count: self
1436                .sink_input_row_count
1437                .with_guarded_label_values(label_list),
1438            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1439            sink_chunk_buffer_size: self
1440                .sink_chunk_buffer_size
1441                .with_guarded_label_values(label_list),
1442        }
1443    }
1444
1445    pub fn new_group_top_n_metrics(
1446        &self,
1447        table_id: TableId,
1448        actor_id: ActorId,
1449        fragment_id: FragmentId,
1450    ) -> GroupTopNMetrics {
1451        let label_list: &[&str; 3] = &[
1452            &table_id.to_string(),
1453            &actor_id.to_string(),
1454            &fragment_id.to_string(),
1455        ];
1456
1457        GroupTopNMetrics {
1458            group_top_n_cache_miss_count: self
1459                .group_top_n_cache_miss_count
1460                .with_guarded_label_values(label_list),
1461            group_top_n_total_query_cache_count: self
1462                .group_top_n_total_query_cache_count
1463                .with_guarded_label_values(label_list),
1464            group_top_n_cached_entry_count: self
1465                .group_top_n_cached_entry_count
1466                .with_guarded_label_values(label_list),
1467        }
1468    }
1469
1470    pub fn new_append_only_group_top_n_metrics(
1471        &self,
1472        table_id: TableId,
1473        actor_id: ActorId,
1474        fragment_id: FragmentId,
1475    ) -> GroupTopNMetrics {
1476        let label_list: &[&str; 3] = &[
1477            &table_id.to_string(),
1478            &actor_id.to_string(),
1479            &fragment_id.to_string(),
1480        ];
1481
1482        GroupTopNMetrics {
1483            group_top_n_cache_miss_count: self
1484                .group_top_n_appendonly_cache_miss_count
1485                .with_guarded_label_values(label_list),
1486            group_top_n_total_query_cache_count: self
1487                .group_top_n_appendonly_total_query_cache_count
1488                .with_guarded_label_values(label_list),
1489            group_top_n_cached_entry_count: self
1490                .group_top_n_appendonly_cached_entry_count
1491                .with_guarded_label_values(label_list),
1492        }
1493    }
1494
1495    pub fn new_lookup_executor_metrics(
1496        &self,
1497        table_id: TableId,
1498        actor_id: ActorId,
1499        fragment_id: FragmentId,
1500    ) -> LookupExecutorMetrics {
1501        let label_list: &[&str; 3] = &[
1502            &table_id.to_string(),
1503            &actor_id.to_string(),
1504            &fragment_id.to_string(),
1505        ];
1506
1507        LookupExecutorMetrics {
1508            lookup_cache_miss_count: self
1509                .lookup_cache_miss_count
1510                .with_guarded_label_values(label_list),
1511            lookup_total_query_cache_count: self
1512                .lookup_total_query_cache_count
1513                .with_guarded_label_values(label_list),
1514            lookup_cached_entry_count: self
1515                .lookup_cached_entry_count
1516                .with_guarded_label_values(label_list),
1517        }
1518    }
1519
1520    pub fn new_hash_agg_metrics(
1521        &self,
1522        table_id: TableId,
1523        actor_id: ActorId,
1524        fragment_id: FragmentId,
1525    ) -> HashAggMetrics {
1526        let label_list: &[&str; 3] = &[
1527            &table_id.to_string(),
1528            &actor_id.to_string(),
1529            &fragment_id.to_string(),
1530        ];
1531        HashAggMetrics {
1532            agg_lookup_miss_count: self
1533                .agg_lookup_miss_count
1534                .with_guarded_label_values(label_list),
1535            agg_total_lookup_count: self
1536                .agg_total_lookup_count
1537                .with_guarded_label_values(label_list),
1538            agg_cached_entry_count: self
1539                .agg_cached_entry_count
1540                .with_guarded_label_values(label_list),
1541            agg_chunk_lookup_miss_count: self
1542                .agg_chunk_lookup_miss_count
1543                .with_guarded_label_values(label_list),
1544            agg_chunk_total_lookup_count: self
1545                .agg_chunk_total_lookup_count
1546                .with_guarded_label_values(label_list),
1547            agg_dirty_groups_count: self
1548                .agg_dirty_groups_count
1549                .with_guarded_label_values(label_list),
1550            agg_dirty_groups_heap_size: self
1551                .agg_dirty_groups_heap_size
1552                .with_guarded_label_values(label_list),
1553            agg_state_cache_lookup_count: self
1554                .agg_state_cache_lookup_count
1555                .with_guarded_label_values(label_list),
1556            agg_state_cache_miss_count: self
1557                .agg_state_cache_miss_count
1558                .with_guarded_label_values(label_list),
1559        }
1560    }
1561
1562    pub fn new_agg_distinct_dedup_metrics(
1563        &self,
1564        table_id: TableId,
1565        actor_id: ActorId,
1566        fragment_id: FragmentId,
1567    ) -> AggDistinctDedupMetrics {
1568        let label_list: &[&str; 3] = &[
1569            &table_id.to_string(),
1570            &actor_id.to_string(),
1571            &fragment_id.to_string(),
1572        ];
1573        AggDistinctDedupMetrics {
1574            agg_distinct_cache_miss_count: self
1575                .agg_distinct_cache_miss_count
1576                .with_guarded_label_values(label_list),
1577            agg_distinct_total_cache_count: self
1578                .agg_distinct_total_cache_count
1579                .with_guarded_label_values(label_list),
1580            agg_distinct_cached_entry_count: self
1581                .agg_distinct_cached_entry_count
1582                .with_guarded_label_values(label_list),
1583        }
1584    }
1585
1586    pub fn new_temporal_join_metrics(
1587        &self,
1588        table_id: TableId,
1589        actor_id: ActorId,
1590        fragment_id: FragmentId,
1591    ) -> TemporalJoinMetrics {
1592        let label_list: &[&str; 3] = &[
1593            &table_id.to_string(),
1594            &actor_id.to_string(),
1595            &fragment_id.to_string(),
1596        ];
1597        TemporalJoinMetrics {
1598            temporal_join_cache_miss_count: self
1599                .temporal_join_cache_miss_count
1600                .with_guarded_label_values(label_list),
1601            temporal_join_total_query_cache_count: self
1602                .temporal_join_total_query_cache_count
1603                .with_guarded_label_values(label_list),
1604            temporal_join_cached_entry_count: self
1605                .temporal_join_cached_entry_count
1606                .with_guarded_label_values(label_list),
1607        }
1608    }
1609
1610    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1611        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1612        BackfillMetrics {
1613            backfill_snapshot_read_row_count: self
1614                .backfill_snapshot_read_row_count
1615                .with_guarded_label_values(label_list),
1616            backfill_upstream_output_row_count: self
1617                .backfill_upstream_output_row_count
1618                .with_guarded_label_values(label_list),
1619        }
1620    }
1621
1622    pub fn new_cdc_backfill_metrics(
1623        &self,
1624        table_id: TableId,
1625        actor_id: ActorId,
1626    ) -> CdcBackfillMetrics {
1627        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1628        CdcBackfillMetrics {
1629            cdc_backfill_snapshot_read_row_count: self
1630                .cdc_backfill_snapshot_read_row_count
1631                .with_guarded_label_values(label_list),
1632            cdc_backfill_upstream_output_row_count: self
1633                .cdc_backfill_upstream_output_row_count
1634                .with_guarded_label_values(label_list),
1635        }
1636    }
1637
1638    pub fn new_over_window_metrics(
1639        &self,
1640        table_id: TableId,
1641        actor_id: ActorId,
1642        fragment_id: FragmentId,
1643    ) -> OverWindowMetrics {
1644        let label_list: &[&str; 3] = &[
1645            &table_id.to_string(),
1646            &actor_id.to_string(),
1647            &fragment_id.to_string(),
1648        ];
1649        OverWindowMetrics {
1650            over_window_cached_entry_count: self
1651                .over_window_cached_entry_count
1652                .with_guarded_label_values(label_list),
1653            over_window_cache_lookup_count: self
1654                .over_window_cache_lookup_count
1655                .with_guarded_label_values(label_list),
1656            over_window_cache_miss_count: self
1657                .over_window_cache_miss_count
1658                .with_guarded_label_values(label_list),
1659            over_window_range_cache_entry_count: self
1660                .over_window_range_cache_entry_count
1661                .with_guarded_label_values(label_list),
1662            over_window_range_cache_lookup_count: self
1663                .over_window_range_cache_lookup_count
1664                .with_guarded_label_values(label_list),
1665            over_window_range_cache_left_miss_count: self
1666                .over_window_range_cache_left_miss_count
1667                .with_guarded_label_values(label_list),
1668            over_window_range_cache_right_miss_count: self
1669                .over_window_range_cache_right_miss_count
1670                .with_guarded_label_values(label_list),
1671            over_window_accessed_entry_count: self
1672                .over_window_accessed_entry_count
1673                .with_guarded_label_values(label_list),
1674            over_window_compute_count: self
1675                .over_window_compute_count
1676                .with_guarded_label_values(label_list),
1677            over_window_same_output_count: self
1678                .over_window_same_output_count
1679                .with_guarded_label_values(label_list),
1680        }
1681    }
1682
1683    pub fn new_materialize_cache_metrics(
1684        &self,
1685        table_id: TableId,
1686        actor_id: ActorId,
1687        fragment_id: FragmentId,
1688    ) -> MaterializeCacheMetrics {
1689        let label_list: &[&str; 3] = &[
1690            &actor_id.to_string(),
1691            &table_id.to_string(),
1692            &fragment_id.to_string(),
1693        ];
1694        MaterializeCacheMetrics {
1695            materialize_cache_hit_count: self
1696                .materialize_cache_hit_count
1697                .with_guarded_label_values(label_list),
1698            materialize_data_exist_count: self
1699                .materialize_data_exist_count
1700                .with_guarded_label_values(label_list),
1701            materialize_cache_total_count: self
1702                .materialize_cache_total_count
1703                .with_guarded_label_values(label_list),
1704        }
1705    }
1706
1707    pub fn new_materialize_metrics(
1708        &self,
1709        table_id: TableId,
1710        actor_id: ActorId,
1711        fragment_id: FragmentId,
1712    ) -> MaterializeMetrics {
1713        let label_list: &[&str; 3] = &[
1714            &actor_id.to_string(),
1715            &table_id.to_string(),
1716            &fragment_id.to_string(),
1717        ];
1718        MaterializeMetrics {
1719            materialize_input_row_count: self
1720                .materialize_input_row_count
1721                .with_guarded_label_values(label_list),
1722            materialize_current_epoch: self
1723                .materialize_current_epoch
1724                .with_guarded_label_values(label_list),
1725        }
1726    }
1727
1728    pub fn new_state_table_metrics(
1729        &self,
1730        table_id: TableId,
1731        actor_id: ActorId,
1732        fragment_id: FragmentId,
1733    ) -> StateTableMetrics {
1734        let label_list: &[&str; 3] = &[
1735            &actor_id.to_string(),
1736            &fragment_id.to_string(),
1737            &table_id.to_string(),
1738        ];
1739        StateTableMetrics {
1740            iter_count: self
1741                .state_table_iter_count
1742                .with_guarded_label_values(label_list),
1743            get_count: self
1744                .state_table_get_count
1745                .with_guarded_label_values(label_list),
1746            iter_vnode_pruned_count: self
1747                .state_table_iter_vnode_pruned_count
1748                .with_guarded_label_values(label_list),
1749            get_vnode_pruned_count: self
1750                .state_table_get_vnode_pruned_count
1751                .with_guarded_label_values(label_list),
1752        }
1753    }
1754}
1755
1756pub(crate) struct ActorInputMetrics {
1757    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1758    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1759}
1760
1761/// Tokio metrics for actors
1762pub struct ActorMetrics {
1763    pub actor_scheduled_duration: LabelGuardedIntCounter,
1764    pub actor_scheduled_cnt: LabelGuardedIntCounter,
1765    pub actor_poll_duration: LabelGuardedIntCounter,
1766    pub actor_poll_cnt: LabelGuardedIntCounter,
1767    pub actor_idle_duration: LabelGuardedIntCounter,
1768    pub actor_idle_cnt: LabelGuardedIntCounter,
1769}
1770
1771pub struct SinkExecutorMetrics {
1772    pub sink_input_row_count: LabelGuardedIntCounter,
1773    pub sink_input_bytes: LabelGuardedIntCounter,
1774    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1775}
1776
1777pub struct MaterializeCacheMetrics {
1778    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1779    pub materialize_data_exist_count: LabelGuardedIntCounter,
1780    pub materialize_cache_total_count: LabelGuardedIntCounter,
1781}
1782
1783pub struct MaterializeMetrics {
1784    pub materialize_input_row_count: LabelGuardedIntCounter,
1785    pub materialize_current_epoch: LabelGuardedIntGauge,
1786}
1787
1788pub struct GroupTopNMetrics {
1789    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1790    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1791    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1792}
1793
1794pub struct LookupExecutorMetrics {
1795    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1796    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1797    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1798}
1799
1800pub struct HashAggMetrics {
1801    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1802    pub agg_total_lookup_count: LabelGuardedIntCounter,
1803    pub agg_cached_entry_count: LabelGuardedIntGauge,
1804    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1805    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1806    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1807    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1808    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1809    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1810}
1811
1812pub struct AggDistinctDedupMetrics {
1813    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1814    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1815    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1816}
1817
1818pub struct TemporalJoinMetrics {
1819    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1820    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1821    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1822}
1823
1824pub struct BackfillMetrics {
1825    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1826    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1827}
1828
1829#[derive(Clone)]
1830pub struct CdcBackfillMetrics {
1831    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1832    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1833}
1834
1835pub struct OverWindowMetrics {
1836    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1837    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1838    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1839    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1840    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1841    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1842    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1843    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1844    pub over_window_compute_count: LabelGuardedIntCounter,
1845    pub over_window_same_output_count: LabelGuardedIntCounter,
1846}
1847
1848pub struct StateTableMetrics {
1849    pub iter_count: LabelGuardedIntCounter,
1850    pub get_count: LabelGuardedIntCounter,
1851    pub iter_vnode_pruned_count: LabelGuardedIntCounter,
1852    pub get_vnode_pruned_count: LabelGuardedIntCounter,
1853}