risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26    LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46    pub level: MetricLevel,
47
48    // Executor metrics (disabled by default)
49    pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51    // Profiling Metrics:
52    // Aggregated per operator rather than per actor.
53    // These are purely in-memory, never collected by prometheus.
54    pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55    pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57    // Streaming actor metrics from tokio (disabled by default)
58    actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59    actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60    actor_poll_duration: RelabeledGuardedIntCounterVec,
61    actor_poll_cnt: RelabeledGuardedIntCounterVec,
62    actor_idle_duration: RelabeledGuardedIntCounterVec,
63    actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65    // Streaming actor
66    pub actor_count: LabelGuardedIntGaugeVec,
67    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
70
71    // Source
72    pub source_output_row_count: LabelGuardedIntCounterVec,
73    pub source_split_change_count: LabelGuardedIntCounterVec,
74    pub source_backfill_row_count: LabelGuardedIntCounterVec,
75
76    // Sink
77    sink_input_row_count: LabelGuardedIntCounterVec,
78    sink_input_bytes: LabelGuardedIntCounterVec,
79    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
80
81    // Exchange (see also `compute::ExchangeServiceMetrics`)
82    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
83
84    // Streaming Merge (We breakout this metric from `barrier_align_duration` because
85    // the alignment happens on different levels)
86    pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
87
88    // Backpressure
89    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90    actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91
92    // Streaming Join
93    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
94    pub join_lookup_total_count: LabelGuardedIntCounterVec,
95    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
96    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
97    pub join_match_duration_ns: LabelGuardedIntCounterVec,
98    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
99    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
100
101    // Streaming Join, Streaming Dynamic Filter and Streaming Union
102    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
103
104    // Streaming Aggregation
105    agg_lookup_miss_count: LabelGuardedIntCounterVec,
106    agg_total_lookup_count: LabelGuardedIntCounterVec,
107    agg_cached_entry_count: LabelGuardedIntGaugeVec,
108    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
109    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
110    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
111    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
112    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
113    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
114    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
115    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
116    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
117
118    // Streaming TopN
119    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
120    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
121    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
122    // TODO(rc): why not just use the above three?
123    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
126
127    // Lookup executor
128    lookup_cache_miss_count: LabelGuardedIntCounterVec,
129    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
130    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
131
132    // temporal join
133    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
134    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
135    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
136
137    // Backfill
138    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
139    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
140
141    // CDC Backfill
142    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // Snapshot Backfill
146    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
147
148    // Over Window
149    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
150    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
151    over_window_cache_miss_count: LabelGuardedIntCounterVec,
152    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
153    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
154    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
155    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
156    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
157    over_window_compute_count: LabelGuardedIntCounterVec,
158    over_window_same_output_count: LabelGuardedIntCounterVec,
159
160    /// The duration from receipt of barrier to all actors collection.
161    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
162    /// to flow through the graph.
163    pub barrier_inflight_latency: Histogram,
164    /// The duration of sync to storage.
165    pub barrier_sync_latency: Histogram,
166    pub barrier_batch_size: Histogram,
167    /// The progress made by the earliest in-flight barriers in the local barrier manager.
168    pub barrier_manager_progress: IntCounter,
169
170    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
171    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
172    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
173    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
174    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
177    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
178    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
179    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
180    pub kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
181
182    pub crossdb_last_consumed_min_epoch: LabelGuardedIntGaugeVec,
183
184    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
185    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
189    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
190    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
191    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
192    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
193    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
194    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
195    pub sync_kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
196
197    // Memory management
198    pub lru_runtime_loop_count: IntCounter,
199    pub lru_latest_sequence: IntGauge,
200    pub lru_watermark_sequence: IntGauge,
201    pub lru_eviction_policy: IntGauge,
202    pub jemalloc_allocated_bytes: IntGauge,
203    pub jemalloc_active_bytes: IntGauge,
204    pub jemalloc_resident_bytes: IntGauge,
205    pub jemalloc_metadata_bytes: IntGauge,
206    pub jvm_allocated_bytes: IntGauge,
207    pub jvm_active_bytes: IntGauge,
208    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210    // Materialized view
211    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214    materialize_input_row_count: RelabeledGuardedIntCounterVec,
215    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217    // PostgreSQL CDC LSN monitoring
218    pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219    pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221    // MySQL CDC binlog monitoring
222    pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
223    pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
224
225    // Gap Fill
226    pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
227
228    // State Table
229    pub state_table_iter_count: RelabeledGuardedIntCounterVec,
230    pub state_table_get_count: RelabeledGuardedIntCounterVec,
231    pub state_table_iter_vnode_pruned_count: RelabeledGuardedIntCounterVec,
232    pub state_table_get_vnode_pruned_count: RelabeledGuardedIntCounterVec,
233}
234
235pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
236
237pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
238    GLOBAL_STREAMING_METRICS
239        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
240        .clone()
241}
242
243impl StreamingMetrics {
244    pub fn new(registry: &Registry, level: MetricLevel) -> Self {
245        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
246            "stream_executor_row_count",
247            "Total number of rows that have been output from each executor",
248            &["actor_id", "fragment_id", "executor_identity"],
249            registry
250        )
251        .unwrap()
252        .relabel_debug_1(level);
253
254        let stream_node_output_row_count = CountMap::new();
255        let stream_node_output_blocking_duration_ns = CountMap::new();
256
257        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
258            "stream_source_output_rows_counts",
259            "Total number of rows that have been output from source",
260            &["source_id", "source_name", "actor_id", "fragment_id"],
261            registry
262        )
263        .unwrap();
264
265        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
266            "stream_source_split_change_event_count",
267            "Total number of split change events that have been operated by source",
268            &["source_id", "source_name", "actor_id", "fragment_id"],
269            registry
270        )
271        .unwrap();
272
273        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
274            "stream_source_backfill_rows_counts",
275            "Total number of rows that have been backfilled for source",
276            &["source_id", "source_name", "actor_id", "fragment_id"],
277            registry
278        )
279        .unwrap();
280
281        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
282            "stream_sink_input_row_count",
283            "Total number of rows streamed into sink executors",
284            &["sink_id", "actor_id", "fragment_id"],
285            registry
286        )
287        .unwrap();
288
289        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
290            "stream_sink_input_bytes",
291            "Total size of chunks streamed into sink executors",
292            &["sink_id", "actor_id", "fragment_id"],
293            registry
294        )
295        .unwrap();
296
297        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
298            "stream_mview_input_row_count",
299            "Total number of rows streamed into materialize executors",
300            &["actor_id", "table_id", "fragment_id"],
301            registry
302        )
303        .unwrap()
304        .relabel_debug_1(level);
305
306        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
307            "stream_mview_current_epoch",
308            "The current epoch of the materialized executor",
309            &["actor_id", "table_id", "fragment_id"],
310            registry
311        )
312        .unwrap()
313        .relabel_debug_1(level);
314
315        let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
316            "stream_pg_cdc_state_table_lsn",
317            "Current LSN value stored in PostgreSQL CDC state table",
318            &["source_id"],
319            registry,
320        )
321        .unwrap();
322
323        let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
324            "stream_pg_cdc_jni_commit_offset_lsn",
325            "LSN value when JNI commit offset is called for PostgreSQL CDC",
326            &["source_id"],
327            registry,
328        )
329        .unwrap();
330
331        let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
332            "stream_mysql_cdc_state_binlog_file_seq",
333            "Current binlog file sequence number stored in MySQL CDC state table",
334            &["source_id"],
335            registry,
336        )
337        .unwrap();
338
339        let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
340            "stream_mysql_cdc_state_binlog_position",
341            "Current binlog position stored in MySQL CDC state table",
342            &["source_id"],
343            registry,
344        )
345        .unwrap();
346
347        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
348            "stream_sink_chunk_buffer_size",
349            "Total size of chunks buffered in a barrier",
350            &["sink_id", "actor_id", "fragment_id"],
351            registry
352        )
353        .unwrap();
354        let actor_output_buffer_blocking_duration_ns =
355            register_guarded_int_counter_vec_with_registry!(
356                "stream_actor_output_buffer_blocking_duration_ns",
357                "Total blocking duration (ns) of output buffer",
358                &["actor_id", "fragment_id", "downstream_fragment_id"],
359                registry
360            )
361            .unwrap()
362            // mask the first label `actor_id` if the level is less verbose than `Debug`
363            .relabel_debug_1(level);
364
365        let actor_input_buffer_blocking_duration_ns =
366            register_guarded_int_counter_vec_with_registry!(
367                "stream_actor_input_buffer_blocking_duration_ns",
368                "Total blocking duration (ns) of input buffer",
369                &["actor_id", "fragment_id", "upstream_fragment_id"],
370                registry
371            )
372            .unwrap()
373            // mask the first label `actor_id` if the level is less verbose than `Debug`
374            .relabel_debug_1(level);
375
376        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
377            "stream_exchange_frag_recv_size",
378            "Total size of messages that have been received from upstream Fragment",
379            &["up_fragment_id", "down_fragment_id"],
380            registry
381        )
382        .unwrap();
383
384        let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
385            "stream_actor_poll_duration",
386            "tokio's metrics",
387            &["actor_id", "fragment_id"],
388            registry
389        )
390        .unwrap()
391        .relabel_debug_1(level);
392
393        let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
394            "stream_actor_poll_cnt",
395            "tokio's metrics",
396            &["actor_id", "fragment_id"],
397            registry
398        )
399        .unwrap()
400        .relabel_debug_1(level);
401
402        let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
403            "stream_actor_scheduled_duration",
404            "tokio's metrics",
405            &["actor_id", "fragment_id"],
406            registry
407        )
408        .unwrap()
409        .relabel_debug_1(level);
410
411        let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
412            "stream_actor_scheduled_cnt",
413            "tokio's metrics",
414            &["actor_id", "fragment_id"],
415            registry
416        )
417        .unwrap()
418        .relabel_debug_1(level);
419
420        let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
421            "stream_actor_idle_duration",
422            "tokio's metrics",
423            &["actor_id", "fragment_id"],
424            registry
425        )
426        .unwrap()
427        .relabel_debug_1(level);
428
429        let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
430            "stream_actor_idle_cnt",
431            "tokio's metrics",
432            &["actor_id", "fragment_id"],
433            registry
434        )
435        .unwrap()
436        .relabel_debug_1(level);
437
438        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
439            "stream_actor_in_record_cnt",
440            "Total number of rows actor received",
441            &["actor_id", "fragment_id", "upstream_fragment_id"],
442            registry
443        )
444        .unwrap()
445        .relabel_debug_1(level);
446
447        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
448            "stream_actor_out_record_cnt",
449            "Total number of rows actor sent",
450            &["actor_id", "fragment_id"],
451            registry
452        )
453        .unwrap()
454        .relabel_debug_1(level);
455
456        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
457            "stream_actor_current_epoch",
458            "Current epoch of actor",
459            &["actor_id", "fragment_id"],
460            registry
461        )
462        .unwrap()
463        .relabel_debug_1(level);
464
465        let actor_count = register_guarded_int_gauge_vec_with_registry!(
466            "stream_actor_count",
467            "Total number of actors (parallelism)",
468            &["fragment_id"],
469            registry
470        )
471        .unwrap();
472
473        let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
474            "stream_merge_barrier_align_duration_ns",
475            "Total merge barrier alignment duration (ns)",
476            &["actor_id", "fragment_id"],
477            registry
478        )
479        .unwrap()
480        .relabel_debug_1(level);
481
482        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
483            "stream_join_lookup_miss_count",
484            "Join executor lookup miss duration",
485            &["side", "join_table_id", "actor_id", "fragment_id"],
486            registry
487        )
488        .unwrap();
489
490        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
491            "stream_join_lookup_total_count",
492            "Join executor lookup total operation",
493            &["side", "join_table_id", "actor_id", "fragment_id"],
494            registry
495        )
496        .unwrap();
497
498        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
499            "stream_join_insert_cache_miss_count",
500            "Join executor cache miss when insert operation",
501            &["side", "join_table_id", "actor_id", "fragment_id"],
502            registry
503        )
504        .unwrap();
505
506        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
507            "stream_join_actor_input_waiting_duration_ns",
508            "Total waiting duration (ns) of input buffer of join actor",
509            &["actor_id", "fragment_id"],
510            registry
511        )
512        .unwrap();
513
514        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
515            "stream_join_match_duration_ns",
516            "Matching duration for each side",
517            &["actor_id", "fragment_id", "side"],
518            registry
519        )
520        .unwrap();
521
522        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
523            "stream_barrier_align_duration_ns",
524            "Duration of join align barrier",
525            &["actor_id", "fragment_id", "wait_side", "executor"],
526            registry
527        )
528        .unwrap()
529        .relabel_debug_1(level);
530
531        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
532            "stream_join_cached_entry_count",
533            "Number of cached entries in streaming join operators",
534            &["actor_id", "fragment_id", "side"],
535            registry
536        )
537        .unwrap();
538
539        let join_matched_join_keys_opts = histogram_opts!(
540            "stream_join_matched_join_keys",
541            "The number of keys matched in the opposite side",
542            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
543        );
544
545        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
546            join_matched_join_keys_opts,
547            &["actor_id", "fragment_id", "table_id"],
548            registry
549        )
550        .unwrap()
551        .relabel_debug_1(level);
552
553        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
554            "stream_agg_lookup_miss_count",
555            "Aggregation executor lookup miss duration",
556            &["table_id", "actor_id", "fragment_id"],
557            registry
558        )
559        .unwrap();
560
561        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
562            "stream_agg_lookup_total_count",
563            "Aggregation executor lookup total operation",
564            &["table_id", "actor_id", "fragment_id"],
565            registry
566        )
567        .unwrap();
568
569        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
570            "stream_agg_distinct_cache_miss_count",
571            "Aggregation executor dinsinct miss duration",
572            &["table_id", "actor_id", "fragment_id"],
573            registry
574        )
575        .unwrap();
576
577        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
578            "stream_agg_distinct_total_cache_count",
579            "Aggregation executor distinct total operation",
580            &["table_id", "actor_id", "fragment_id"],
581            registry
582        )
583        .unwrap();
584
585        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
586            "stream_agg_distinct_cached_entry_count",
587            "Total entry counts in distinct aggregation executor cache",
588            &["table_id", "actor_id", "fragment_id"],
589            registry
590        )
591        .unwrap();
592
593        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
594            "stream_agg_dirty_groups_count",
595            "Total dirty group counts in aggregation executor",
596            &["table_id", "actor_id", "fragment_id"],
597            registry
598        )
599        .unwrap();
600
601        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
602            "stream_agg_dirty_groups_heap_size",
603            "Total dirty group heap size in aggregation executor",
604            &["table_id", "actor_id", "fragment_id"],
605            registry
606        )
607        .unwrap();
608
609        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
610            "stream_agg_state_cache_lookup_count",
611            "Aggregation executor state cache lookup count",
612            &["table_id", "actor_id", "fragment_id"],
613            registry
614        )
615        .unwrap();
616
617        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
618            "stream_agg_state_cache_miss_count",
619            "Aggregation executor state cache miss count",
620            &["table_id", "actor_id", "fragment_id"],
621            registry
622        )
623        .unwrap();
624
625        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
626            "stream_group_top_n_cache_miss_count",
627            "Group top n executor cache miss count",
628            &["table_id", "actor_id", "fragment_id"],
629            registry
630        )
631        .unwrap();
632
633        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
634            "stream_group_top_n_total_query_cache_count",
635            "Group top n executor query cache total count",
636            &["table_id", "actor_id", "fragment_id"],
637            registry
638        )
639        .unwrap();
640
641        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
642            "stream_group_top_n_cached_entry_count",
643            "Total entry counts in group top n executor cache",
644            &["table_id", "actor_id", "fragment_id"],
645            registry
646        )
647        .unwrap();
648
649        let group_top_n_appendonly_cache_miss_count =
650            register_guarded_int_counter_vec_with_registry!(
651                "stream_group_top_n_appendonly_cache_miss_count",
652                "Group top n appendonly executor cache miss count",
653                &["table_id", "actor_id", "fragment_id"],
654                registry
655            )
656            .unwrap();
657
658        let group_top_n_appendonly_total_query_cache_count =
659            register_guarded_int_counter_vec_with_registry!(
660                "stream_group_top_n_appendonly_total_query_cache_count",
661                "Group top n appendonly executor total cache count",
662                &["table_id", "actor_id", "fragment_id"],
663                registry
664            )
665            .unwrap();
666
667        let group_top_n_appendonly_cached_entry_count =
668            register_guarded_int_gauge_vec_with_registry!(
669                "stream_group_top_n_appendonly_cached_entry_count",
670                "Total entry counts in group top n appendonly executor cache",
671                &["table_id", "actor_id", "fragment_id"],
672                registry
673            )
674            .unwrap();
675
676        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
677            "stream_lookup_cache_miss_count",
678            "Lookup executor cache miss count",
679            &["table_id", "actor_id", "fragment_id"],
680            registry
681        )
682        .unwrap();
683
684        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
685            "stream_lookup_total_query_cache_count",
686            "Lookup executor query cache total count",
687            &["table_id", "actor_id", "fragment_id"],
688            registry
689        )
690        .unwrap();
691
692        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
693            "stream_lookup_cached_entry_count",
694            "Total entry counts in lookup executor cache",
695            &["table_id", "actor_id", "fragment_id"],
696            registry
697        )
698        .unwrap();
699
700        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
701            "stream_temporal_join_cache_miss_count",
702            "Temporal join executor cache miss count",
703            &["table_id", "actor_id", "fragment_id"],
704            registry
705        )
706        .unwrap();
707
708        let temporal_join_total_query_cache_count =
709            register_guarded_int_counter_vec_with_registry!(
710                "stream_temporal_join_total_query_cache_count",
711                "Temporal join executor query cache total count",
712                &["table_id", "actor_id", "fragment_id"],
713                registry
714            )
715            .unwrap();
716
717        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
718            "stream_temporal_join_cached_entry_count",
719            "Total entry count in temporal join executor cache",
720            &["table_id", "actor_id", "fragment_id"],
721            registry
722        )
723        .unwrap();
724
725        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
726            "stream_agg_cached_entry_count",
727            "Number of cached keys in streaming aggregation operators",
728            &["table_id", "actor_id", "fragment_id"],
729            registry
730        )
731        .unwrap();
732
733        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
734            "stream_agg_chunk_lookup_miss_count",
735            "Aggregation executor chunk-level lookup miss duration",
736            &["table_id", "actor_id", "fragment_id"],
737            registry
738        )
739        .unwrap();
740
741        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
742            "stream_agg_chunk_lookup_total_count",
743            "Aggregation executor chunk-level lookup total operation",
744            &["table_id", "actor_id", "fragment_id"],
745            registry
746        )
747        .unwrap();
748
749        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
750            "stream_backfill_snapshot_read_row_count",
751            "Total number of rows that have been read from the backfill snapshot",
752            &["table_id", "actor_id"],
753            registry
754        )
755        .unwrap();
756
757        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
758            "stream_backfill_upstream_output_row_count",
759            "Total number of rows that have been output from the backfill upstream",
760            &["table_id", "actor_id"],
761            registry
762        )
763        .unwrap();
764
765        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
766            "stream_cdc_backfill_snapshot_read_row_count",
767            "Total number of rows that have been read from the cdc_backfill snapshot",
768            &["table_id", "actor_id"],
769            registry
770        )
771        .unwrap();
772
773        let cdc_backfill_upstream_output_row_count =
774            register_guarded_int_counter_vec_with_registry!(
775                "stream_cdc_backfill_upstream_output_row_count",
776                "Total number of rows that have been output from the cdc_backfill upstream",
777                &["table_id", "actor_id"],
778                registry
779            )
780            .unwrap();
781
782        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
783            "stream_snapshot_backfill_consume_snapshot_row_count",
784            "Total number of rows that have been output from snapshot backfill",
785            &["table_id", "actor_id", "stage"],
786            registry
787        )
788        .unwrap();
789
790        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
791            "stream_over_window_cached_entry_count",
792            "Total entry (partition) count in over window executor cache",
793            &["table_id", "actor_id", "fragment_id"],
794            registry
795        )
796        .unwrap();
797
798        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
799            "stream_over_window_cache_lookup_count",
800            "Over window executor cache lookup count",
801            &["table_id", "actor_id", "fragment_id"],
802            registry
803        )
804        .unwrap();
805
806        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
807            "stream_over_window_cache_miss_count",
808            "Over window executor cache miss count",
809            &["table_id", "actor_id", "fragment_id"],
810            registry
811        )
812        .unwrap();
813
814        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
815            "stream_over_window_range_cache_entry_count",
816            "Over window partition range cache entry count",
817            &["table_id", "actor_id", "fragment_id"],
818            registry,
819        )
820        .unwrap();
821
822        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
823            "stream_over_window_range_cache_lookup_count",
824            "Over window partition range cache lookup count",
825            &["table_id", "actor_id", "fragment_id"],
826            registry
827        )
828        .unwrap();
829
830        let over_window_range_cache_left_miss_count =
831            register_guarded_int_counter_vec_with_registry!(
832                "stream_over_window_range_cache_left_miss_count",
833                "Over window partition range cache left miss count",
834                &["table_id", "actor_id", "fragment_id"],
835                registry
836            )
837            .unwrap();
838
839        let over_window_range_cache_right_miss_count =
840            register_guarded_int_counter_vec_with_registry!(
841                "stream_over_window_range_cache_right_miss_count",
842                "Over window partition range cache right miss count",
843                &["table_id", "actor_id", "fragment_id"],
844                registry
845            )
846            .unwrap();
847
848        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
849            "stream_over_window_accessed_entry_count",
850            "Over window accessed entry count",
851            &["table_id", "actor_id", "fragment_id"],
852            registry
853        )
854        .unwrap();
855
856        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
857            "stream_over_window_compute_count",
858            "Over window compute count",
859            &["table_id", "actor_id", "fragment_id"],
860            registry
861        )
862        .unwrap();
863
864        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
865            "stream_over_window_same_output_count",
866            "Over window same output count",
867            &["table_id", "actor_id", "fragment_id"],
868            registry
869        )
870        .unwrap();
871
872        let opts = histogram_opts!(
873            "stream_barrier_inflight_duration_seconds",
874            "barrier_inflight_latency",
875            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
876        );
877        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
878
879        let opts = histogram_opts!(
880            "stream_barrier_sync_storage_duration_seconds",
881            "barrier_sync_latency",
882            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
883        );
884        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
885
886        let opts = histogram_opts!(
887            "stream_barrier_batch_size",
888            "barrier_batch_size",
889            exponential_buckets(1.0, 2.0, 8).unwrap()
890        );
891        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
892
893        let barrier_manager_progress = register_int_counter_with_registry!(
894            "stream_barrier_manager_progress",
895            "The number of actors that have processed the earliest in-flight barriers",
896            registry
897        )
898        .unwrap();
899
900        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
901            "sync_kv_log_store_wait_next_poll_ns",
902            "Total duration (ns) of waiting for next poll",
903            &["actor_id", "target", "fragment_id", "relation"],
904            registry
905        )
906        .unwrap();
907
908        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
909            "sync_kv_log_store_read_count",
910            "read row count throughput of sync_kv log store",
911            &["type", "actor_id", "target", "fragment_id", "relation"],
912            registry
913        )
914        .unwrap();
915
916        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
917            "sync_kv_log_store_read_size",
918            "read size throughput of sync_kv log store",
919            &["type", "actor_id", "target", "fragment_id", "relation"],
920            registry
921        )
922        .unwrap();
923
924        let sync_kv_log_store_write_pause_duration_ns =
925            register_guarded_int_counter_vec_with_registry!(
926                "sync_kv_log_store_write_pause_duration_ns",
927                "Duration (ns) of sync_kv log store write pause",
928                &["actor_id", "target", "fragment_id", "relation"],
929                registry
930            )
931            .unwrap();
932
933        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
934            "sync_kv_log_store_state",
935            "clean/unclean state transition for sync_kv log store",
936            &["state", "actor_id", "target", "fragment_id", "relation"],
937            registry
938        )
939        .unwrap();
940
941        let sync_kv_log_store_storage_write_count =
942            register_guarded_int_counter_vec_with_registry!(
943                "sync_kv_log_store_storage_write_count",
944                "Write row count throughput of sync_kv log store",
945                &["actor_id", "target", "fragment_id", "relation"],
946                registry
947            )
948            .unwrap();
949
950        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
951            "sync_kv_log_store_storage_write_size",
952            "Write size throughput of sync_kv log store",
953            &["actor_id", "target", "fragment_id", "relation"],
954            registry
955        )
956        .unwrap();
957
958        let sync_kv_log_store_buffer_unconsumed_item_count =
959            register_guarded_int_gauge_vec_with_registry!(
960                "sync_kv_log_store_buffer_unconsumed_item_count",
961                "Number of Unconsumed Item in buffer",
962                &["actor_id", "target", "fragment_id", "relation"],
963                registry
964            )
965            .unwrap();
966
967        let sync_kv_log_store_buffer_unconsumed_row_count =
968            register_guarded_int_gauge_vec_with_registry!(
969                "sync_kv_log_store_buffer_unconsumed_row_count",
970                "Number of Unconsumed Row in buffer",
971                &["actor_id", "target", "fragment_id", "relation"],
972                registry
973            )
974            .unwrap();
975
976        let sync_kv_log_store_buffer_unconsumed_epoch_count =
977            register_guarded_int_gauge_vec_with_registry!(
978                "sync_kv_log_store_buffer_unconsumed_epoch_count",
979                "Number of Unconsumed Epoch in buffer",
980                &["actor_id", "target", "fragment_id", "relation"],
981                registry
982            )
983            .unwrap();
984
985        let sync_kv_log_store_buffer_unconsumed_min_epoch =
986            register_guarded_int_gauge_vec_with_registry!(
987                "sync_kv_log_store_buffer_unconsumed_min_epoch",
988                "Number of Unconsumed Epoch in buffer",
989                &["actor_id", "target", "fragment_id", "relation"],
990                registry
991            )
992            .unwrap();
993        let sync_kv_log_store_buffer_memory_bytes =
994            register_guarded_int_gauge_vec_with_registry!(
995                "sync_kv_log_store_buffer_memory_bytes",
996                "Estimated heap bytes used by synced kv log store buffer (unconsumed + consumed but not truncated)",
997                &["actor_id", "target", "fragment_id", "relation"],
998                registry
999            )
1000            .unwrap();
1001
1002        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1003            "kv_log_store_storage_write_count",
1004            "Write row count throughput of kv log store",
1005            &["actor_id", "connector", "sink_id", "sink_name"],
1006            registry
1007        )
1008        .unwrap();
1009
1010        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1011            "kv_log_store_storage_write_size",
1012            "Write size throughput of kv log store",
1013            &["actor_id", "connector", "sink_id", "sink_name"],
1014            registry
1015        )
1016        .unwrap();
1017
1018        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1019            "kv_log_store_storage_read_count",
1020            "Write row count throughput of kv log store",
1021            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1022            registry
1023        )
1024        .unwrap();
1025
1026        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1027            "kv_log_store_storage_read_size",
1028            "Write size throughput of kv log store",
1029            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1030            registry
1031        )
1032        .unwrap();
1033
1034        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1035            "kv_log_store_rewind_count",
1036            "Kv log store rewind rate",
1037            &["actor_id", "connector", "sink_id", "sink_name"],
1038            registry
1039        )
1040        .unwrap();
1041
1042        let kv_log_store_rewind_delay_opts = {
1043            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1044            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1045                - REWIND_BASE_DELAY.as_secs_f64().log2())
1046            .ceil() as usize;
1047            let buckets = exponential_buckets(
1048                REWIND_BASE_DELAY.as_secs_f64(),
1049                REWIND_BACKOFF_FACTOR as _,
1050                bucket_count,
1051            )
1052            .unwrap();
1053            histogram_opts!(
1054                "kv_log_store_rewind_delay",
1055                "Kv log store rewind delay",
1056                buckets,
1057            )
1058        };
1059
1060        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1061            kv_log_store_rewind_delay_opts,
1062            &["actor_id", "connector", "sink_id", "sink_name"],
1063            registry
1064        )
1065        .unwrap();
1066
1067        let kv_log_store_buffer_unconsumed_item_count =
1068            register_guarded_int_gauge_vec_with_registry!(
1069                "kv_log_store_buffer_unconsumed_item_count",
1070                "Number of Unconsumed Item in buffer",
1071                &["actor_id", "connector", "sink_id", "sink_name"],
1072                registry
1073            )
1074            .unwrap();
1075
1076        let kv_log_store_buffer_unconsumed_row_count =
1077            register_guarded_int_gauge_vec_with_registry!(
1078                "kv_log_store_buffer_unconsumed_row_count",
1079                "Number of Unconsumed Row in buffer",
1080                &["actor_id", "connector", "sink_id", "sink_name"],
1081                registry
1082            )
1083            .unwrap();
1084
1085        let kv_log_store_buffer_unconsumed_epoch_count =
1086            register_guarded_int_gauge_vec_with_registry!(
1087                "kv_log_store_buffer_unconsumed_epoch_count",
1088                "Number of Unconsumed Epoch in buffer",
1089                &["actor_id", "connector", "sink_id", "sink_name"],
1090                registry
1091            )
1092            .unwrap();
1093
1094        let kv_log_store_buffer_unconsumed_min_epoch =
1095            register_guarded_int_gauge_vec_with_registry!(
1096                "kv_log_store_buffer_unconsumed_min_epoch",
1097                "Number of Unconsumed Epoch in buffer",
1098                &["actor_id", "connector", "sink_id", "sink_name"],
1099                registry
1100            )
1101            .unwrap();
1102
1103        let crossdb_last_consumed_min_epoch = register_guarded_int_gauge_vec_with_registry!(
1104            "crossdb_last_consumed_min_epoch",
1105            "Last consumed min epoch for cross-database changelog stream scan",
1106            &["table_id", "actor_id", "fragment_id"],
1107            registry
1108        )
1109        .unwrap();
1110
1111        let kv_log_store_buffer_memory_bytes =
1112            register_guarded_int_gauge_vec_with_registry!(
1113                "kv_log_store_buffer_memory_bytes",
1114                "Estimated heap bytes used by kv log store buffer (unconsumed + consumed but not truncated)",
1115                &["actor_id", "connector", "sink_id", "sink_name"],
1116                registry
1117            )
1118            .unwrap();
1119
1120        let lru_runtime_loop_count = register_int_counter_with_registry!(
1121            "lru_runtime_loop_count",
1122            "The counts of the eviction loop in LRU manager per second",
1123            registry
1124        )
1125        .unwrap();
1126
1127        let lru_latest_sequence = register_int_gauge_with_registry!(
1128            "lru_latest_sequence",
1129            "Current LRU global sequence",
1130            registry,
1131        )
1132        .unwrap();
1133
1134        let lru_watermark_sequence = register_int_gauge_with_registry!(
1135            "lru_watermark_sequence",
1136            "Current LRU watermark sequence",
1137            registry,
1138        )
1139        .unwrap();
1140
1141        let lru_eviction_policy = register_int_gauge_with_registry!(
1142            "lru_eviction_policy",
1143            "Current LRU eviction policy",
1144            registry,
1145        )
1146        .unwrap();
1147
1148        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1149            "jemalloc_allocated_bytes",
1150            "The allocated memory jemalloc, got from jemalloc_ctl",
1151            registry
1152        )
1153        .unwrap();
1154
1155        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1156            "jemalloc_active_bytes",
1157            "The active memory jemalloc, got from jemalloc_ctl",
1158            registry
1159        )
1160        .unwrap();
1161
1162        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1163            "jemalloc_resident_bytes",
1164            "The active memory jemalloc, got from jemalloc_ctl",
1165            registry
1166        )
1167        .unwrap();
1168
1169        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1170            "jemalloc_metadata_bytes",
1171            "The active memory jemalloc, got from jemalloc_ctl",
1172            registry
1173        )
1174        .unwrap();
1175
1176        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1177            "jvm_allocated_bytes",
1178            "The allocated jvm memory",
1179            registry
1180        )
1181        .unwrap();
1182
1183        let jvm_active_bytes = register_int_gauge_with_registry!(
1184            "jvm_active_bytes",
1185            "The active jvm memory",
1186            registry
1187        )
1188        .unwrap();
1189
1190        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1191            "stream_materialize_cache_hit_count",
1192            "Materialize executor cache hit count",
1193            &["actor_id", "table_id", "fragment_id"],
1194            registry
1195        )
1196        .unwrap()
1197        .relabel_debug_1(level);
1198
1199        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1200            "stream_materialize_data_exist_count",
1201            "Materialize executor data exist count",
1202            &["actor_id", "table_id", "fragment_id"],
1203            registry
1204        )
1205        .unwrap()
1206        .relabel_debug_1(level);
1207
1208        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1209            "stream_materialize_cache_total_count",
1210            "Materialize executor cache total operation",
1211            &["actor_id", "table_id", "fragment_id"],
1212            registry
1213        )
1214        .unwrap()
1215        .relabel_debug_1(level);
1216
1217        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1218            "stream_memory_usage",
1219            "Memory usage for stream executors",
1220            &["actor_id", "table_id", "desc"],
1221            registry
1222        )
1223        .unwrap()
1224        .relabel_debug_1(level);
1225
1226        let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1227            "gap_fill_generated_rows_count",
1228            "Total number of rows generated by gap fill executor",
1229            &["actor_id", "fragment_id"],
1230            registry
1231        )
1232        .unwrap()
1233        .relabel_debug_1(level);
1234
1235        let state_table_iter_count = register_guarded_int_counter_vec_with_registry!(
1236            "state_table_iter_count",
1237            "Total number of state table iter operations",
1238            &["actor_id", "fragment_id", "table_id"],
1239            registry
1240        )
1241        .unwrap()
1242        .relabel_debug_1(level);
1243
1244        let state_table_get_count = register_guarded_int_counter_vec_with_registry!(
1245            "state_table_get_count",
1246            "Total number of state table get operations",
1247            &["actor_id", "fragment_id", "table_id"],
1248            registry
1249        )
1250        .unwrap()
1251        .relabel_debug_1(level);
1252
1253        let state_table_iter_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1254            "state_table_iter_vnode_pruned_count",
1255            "Total number of state table iter operations pruned by vnode statistics",
1256            &["actor_id", "fragment_id", "table_id"],
1257            registry
1258        )
1259        .unwrap()
1260        .relabel_debug_1(level);
1261
1262        let state_table_get_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1263            "state_table_get_vnode_pruned_count",
1264            "Total number of state table get operations pruned by vnode statistics",
1265            &["actor_id", "fragment_id", "table_id"],
1266            registry
1267        )
1268        .unwrap()
1269        .relabel_debug_1(level);
1270
1271        Self {
1272            level,
1273            executor_row_count,
1274            mem_stream_node_output_row_count: stream_node_output_row_count,
1275            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1276            actor_scheduled_duration,
1277            actor_scheduled_cnt,
1278            actor_poll_duration,
1279            actor_poll_cnt,
1280            actor_idle_duration,
1281            actor_idle_cnt,
1282            actor_count,
1283            actor_in_record_cnt,
1284            actor_out_record_cnt,
1285            actor_current_epoch,
1286            source_output_row_count,
1287            source_split_change_count,
1288            source_backfill_row_count,
1289            sink_input_row_count,
1290            sink_input_bytes,
1291            sink_chunk_buffer_size,
1292            exchange_frag_recv_size,
1293            merge_barrier_align_duration,
1294            actor_output_buffer_blocking_duration_ns,
1295            actor_input_buffer_blocking_duration_ns,
1296            join_lookup_miss_count,
1297            join_lookup_total_count,
1298            join_insert_cache_miss_count,
1299            join_actor_input_waiting_duration_ns,
1300            join_match_duration_ns,
1301            join_cached_entry_count,
1302            join_matched_join_keys,
1303            barrier_align_duration,
1304            agg_lookup_miss_count,
1305            agg_total_lookup_count,
1306            agg_cached_entry_count,
1307            agg_chunk_lookup_miss_count,
1308            agg_chunk_total_lookup_count,
1309            agg_dirty_groups_count,
1310            agg_dirty_groups_heap_size,
1311            agg_distinct_cache_miss_count,
1312            agg_distinct_total_cache_count,
1313            agg_distinct_cached_entry_count,
1314            agg_state_cache_lookup_count,
1315            agg_state_cache_miss_count,
1316            group_top_n_cache_miss_count,
1317            group_top_n_total_query_cache_count,
1318            group_top_n_cached_entry_count,
1319            group_top_n_appendonly_cache_miss_count,
1320            group_top_n_appendonly_total_query_cache_count,
1321            group_top_n_appendonly_cached_entry_count,
1322            lookup_cache_miss_count,
1323            lookup_total_query_cache_count,
1324            lookup_cached_entry_count,
1325            temporal_join_cache_miss_count,
1326            temporal_join_total_query_cache_count,
1327            temporal_join_cached_entry_count,
1328            backfill_snapshot_read_row_count,
1329            backfill_upstream_output_row_count,
1330            cdc_backfill_snapshot_read_row_count,
1331            cdc_backfill_upstream_output_row_count,
1332            snapshot_backfill_consume_row_count,
1333            over_window_cached_entry_count,
1334            over_window_cache_lookup_count,
1335            over_window_cache_miss_count,
1336            over_window_range_cache_entry_count,
1337            over_window_range_cache_lookup_count,
1338            over_window_range_cache_left_miss_count,
1339            over_window_range_cache_right_miss_count,
1340            over_window_accessed_entry_count,
1341            over_window_compute_count,
1342            over_window_same_output_count,
1343            barrier_inflight_latency,
1344            barrier_sync_latency,
1345            barrier_batch_size,
1346            barrier_manager_progress,
1347            kv_log_store_storage_write_count,
1348            kv_log_store_storage_write_size,
1349            kv_log_store_rewind_count,
1350            kv_log_store_rewind_delay,
1351            kv_log_store_storage_read_count,
1352            kv_log_store_storage_read_size,
1353            kv_log_store_buffer_unconsumed_item_count,
1354            kv_log_store_buffer_unconsumed_row_count,
1355            kv_log_store_buffer_unconsumed_epoch_count,
1356            kv_log_store_buffer_unconsumed_min_epoch,
1357            kv_log_store_buffer_memory_bytes,
1358            crossdb_last_consumed_min_epoch,
1359            sync_kv_log_store_read_count,
1360            sync_kv_log_store_read_size,
1361            sync_kv_log_store_write_pause_duration_ns,
1362            sync_kv_log_store_state,
1363            sync_kv_log_store_wait_next_poll_ns,
1364            sync_kv_log_store_storage_write_count,
1365            sync_kv_log_store_storage_write_size,
1366            sync_kv_log_store_buffer_unconsumed_item_count,
1367            sync_kv_log_store_buffer_unconsumed_row_count,
1368            sync_kv_log_store_buffer_unconsumed_epoch_count,
1369            sync_kv_log_store_buffer_unconsumed_min_epoch,
1370            sync_kv_log_store_buffer_memory_bytes,
1371            lru_runtime_loop_count,
1372            lru_latest_sequence,
1373            lru_watermark_sequence,
1374            lru_eviction_policy,
1375            jemalloc_allocated_bytes,
1376            jemalloc_active_bytes,
1377            jemalloc_resident_bytes,
1378            jemalloc_metadata_bytes,
1379            jvm_allocated_bytes,
1380            jvm_active_bytes,
1381            stream_memory_usage,
1382            materialize_cache_hit_count,
1383            materialize_data_exist_count,
1384            materialize_cache_total_count,
1385            materialize_input_row_count,
1386            materialize_current_epoch,
1387            pg_cdc_state_table_lsn,
1388            pg_cdc_jni_commit_offset_lsn,
1389            mysql_cdc_state_binlog_file_seq,
1390            mysql_cdc_state_binlog_position,
1391            gap_fill_generated_rows_count,
1392            state_table_iter_count,
1393            state_table_get_count,
1394            state_table_iter_vnode_pruned_count,
1395            state_table_get_vnode_pruned_count,
1396        }
1397    }
1398
1399    /// Create a new `StreamingMetrics` instance used in tests or other places.
1400    pub fn unused() -> Self {
1401        global_streaming_metrics(MetricLevel::Disabled)
1402    }
1403
1404    pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1405        let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1406        let actor_scheduled_duration = self
1407            .actor_scheduled_duration
1408            .with_guarded_label_values(label_list);
1409        let actor_scheduled_cnt = self
1410            .actor_scheduled_cnt
1411            .with_guarded_label_values(label_list);
1412        let actor_poll_duration = self
1413            .actor_poll_duration
1414            .with_guarded_label_values(label_list);
1415        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1416        let actor_idle_duration = self
1417            .actor_idle_duration
1418            .with_guarded_label_values(label_list);
1419        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1420        ActorMetrics {
1421            actor_scheduled_duration,
1422            actor_scheduled_cnt,
1423            actor_poll_duration,
1424            actor_poll_cnt,
1425            actor_idle_duration,
1426            actor_idle_cnt,
1427        }
1428    }
1429
1430    pub(crate) fn new_actor_input_metrics(
1431        &self,
1432        actor_id: ActorId,
1433        fragment_id: FragmentId,
1434        upstream_fragment_id: FragmentId,
1435    ) -> ActorInputMetrics {
1436        let actor_id_str = actor_id.to_string();
1437        let fragment_id_str = fragment_id.to_string();
1438        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1439        ActorInputMetrics {
1440            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1441                &actor_id_str,
1442                &fragment_id_str,
1443                &upstream_fragment_id_str,
1444            ]),
1445            actor_input_buffer_blocking_duration_ns: self
1446                .actor_input_buffer_blocking_duration_ns
1447                .with_guarded_label_values(&[
1448                    &actor_id_str,
1449                    &fragment_id_str,
1450                    &upstream_fragment_id_str,
1451                ]),
1452        }
1453    }
1454
1455    pub fn new_sink_exec_metrics(
1456        &self,
1457        id: SinkId,
1458        actor_id: ActorId,
1459        fragment_id: FragmentId,
1460    ) -> SinkExecutorMetrics {
1461        let label_list: &[&str; 3] = &[
1462            &id.to_string(),
1463            &actor_id.to_string(),
1464            &fragment_id.to_string(),
1465        ];
1466        SinkExecutorMetrics {
1467            sink_input_row_count: self
1468                .sink_input_row_count
1469                .with_guarded_label_values(label_list),
1470            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1471            sink_chunk_buffer_size: self
1472                .sink_chunk_buffer_size
1473                .with_guarded_label_values(label_list),
1474        }
1475    }
1476
1477    pub fn new_group_top_n_metrics(
1478        &self,
1479        table_id: TableId,
1480        actor_id: ActorId,
1481        fragment_id: FragmentId,
1482    ) -> GroupTopNMetrics {
1483        let label_list: &[&str; 3] = &[
1484            &table_id.to_string(),
1485            &actor_id.to_string(),
1486            &fragment_id.to_string(),
1487        ];
1488
1489        GroupTopNMetrics {
1490            group_top_n_cache_miss_count: self
1491                .group_top_n_cache_miss_count
1492                .with_guarded_label_values(label_list),
1493            group_top_n_total_query_cache_count: self
1494                .group_top_n_total_query_cache_count
1495                .with_guarded_label_values(label_list),
1496            group_top_n_cached_entry_count: self
1497                .group_top_n_cached_entry_count
1498                .with_guarded_label_values(label_list),
1499        }
1500    }
1501
1502    pub fn new_append_only_group_top_n_metrics(
1503        &self,
1504        table_id: TableId,
1505        actor_id: ActorId,
1506        fragment_id: FragmentId,
1507    ) -> GroupTopNMetrics {
1508        let label_list: &[&str; 3] = &[
1509            &table_id.to_string(),
1510            &actor_id.to_string(),
1511            &fragment_id.to_string(),
1512        ];
1513
1514        GroupTopNMetrics {
1515            group_top_n_cache_miss_count: self
1516                .group_top_n_appendonly_cache_miss_count
1517                .with_guarded_label_values(label_list),
1518            group_top_n_total_query_cache_count: self
1519                .group_top_n_appendonly_total_query_cache_count
1520                .with_guarded_label_values(label_list),
1521            group_top_n_cached_entry_count: self
1522                .group_top_n_appendonly_cached_entry_count
1523                .with_guarded_label_values(label_list),
1524        }
1525    }
1526
1527    pub fn new_lookup_executor_metrics(
1528        &self,
1529        table_id: TableId,
1530        actor_id: ActorId,
1531        fragment_id: FragmentId,
1532    ) -> LookupExecutorMetrics {
1533        let label_list: &[&str; 3] = &[
1534            &table_id.to_string(),
1535            &actor_id.to_string(),
1536            &fragment_id.to_string(),
1537        ];
1538
1539        LookupExecutorMetrics {
1540            lookup_cache_miss_count: self
1541                .lookup_cache_miss_count
1542                .with_guarded_label_values(label_list),
1543            lookup_total_query_cache_count: self
1544                .lookup_total_query_cache_count
1545                .with_guarded_label_values(label_list),
1546            lookup_cached_entry_count: self
1547                .lookup_cached_entry_count
1548                .with_guarded_label_values(label_list),
1549        }
1550    }
1551
1552    pub fn new_hash_agg_metrics(
1553        &self,
1554        table_id: TableId,
1555        actor_id: ActorId,
1556        fragment_id: FragmentId,
1557    ) -> HashAggMetrics {
1558        let label_list: &[&str; 3] = &[
1559            &table_id.to_string(),
1560            &actor_id.to_string(),
1561            &fragment_id.to_string(),
1562        ];
1563        HashAggMetrics {
1564            agg_lookup_miss_count: self
1565                .agg_lookup_miss_count
1566                .with_guarded_label_values(label_list),
1567            agg_total_lookup_count: self
1568                .agg_total_lookup_count
1569                .with_guarded_label_values(label_list),
1570            agg_cached_entry_count: self
1571                .agg_cached_entry_count
1572                .with_guarded_label_values(label_list),
1573            agg_chunk_lookup_miss_count: self
1574                .agg_chunk_lookup_miss_count
1575                .with_guarded_label_values(label_list),
1576            agg_chunk_total_lookup_count: self
1577                .agg_chunk_total_lookup_count
1578                .with_guarded_label_values(label_list),
1579            agg_dirty_groups_count: self
1580                .agg_dirty_groups_count
1581                .with_guarded_label_values(label_list),
1582            agg_dirty_groups_heap_size: self
1583                .agg_dirty_groups_heap_size
1584                .with_guarded_label_values(label_list),
1585            agg_state_cache_lookup_count: self
1586                .agg_state_cache_lookup_count
1587                .with_guarded_label_values(label_list),
1588            agg_state_cache_miss_count: self
1589                .agg_state_cache_miss_count
1590                .with_guarded_label_values(label_list),
1591        }
1592    }
1593
1594    pub fn new_agg_distinct_dedup_metrics(
1595        &self,
1596        table_id: TableId,
1597        actor_id: ActorId,
1598        fragment_id: FragmentId,
1599    ) -> AggDistinctDedupMetrics {
1600        let label_list: &[&str; 3] = &[
1601            &table_id.to_string(),
1602            &actor_id.to_string(),
1603            &fragment_id.to_string(),
1604        ];
1605        AggDistinctDedupMetrics {
1606            agg_distinct_cache_miss_count: self
1607                .agg_distinct_cache_miss_count
1608                .with_guarded_label_values(label_list),
1609            agg_distinct_total_cache_count: self
1610                .agg_distinct_total_cache_count
1611                .with_guarded_label_values(label_list),
1612            agg_distinct_cached_entry_count: self
1613                .agg_distinct_cached_entry_count
1614                .with_guarded_label_values(label_list),
1615        }
1616    }
1617
1618    pub fn new_temporal_join_metrics(
1619        &self,
1620        table_id: TableId,
1621        actor_id: ActorId,
1622        fragment_id: FragmentId,
1623    ) -> TemporalJoinMetrics {
1624        let label_list: &[&str; 3] = &[
1625            &table_id.to_string(),
1626            &actor_id.to_string(),
1627            &fragment_id.to_string(),
1628        ];
1629        TemporalJoinMetrics {
1630            temporal_join_cache_miss_count: self
1631                .temporal_join_cache_miss_count
1632                .with_guarded_label_values(label_list),
1633            temporal_join_total_query_cache_count: self
1634                .temporal_join_total_query_cache_count
1635                .with_guarded_label_values(label_list),
1636            temporal_join_cached_entry_count: self
1637                .temporal_join_cached_entry_count
1638                .with_guarded_label_values(label_list),
1639        }
1640    }
1641
1642    pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1643        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1644        BackfillMetrics {
1645            backfill_snapshot_read_row_count: self
1646                .backfill_snapshot_read_row_count
1647                .with_guarded_label_values(label_list),
1648            backfill_upstream_output_row_count: self
1649                .backfill_upstream_output_row_count
1650                .with_guarded_label_values(label_list),
1651        }
1652    }
1653
1654    pub fn new_cdc_backfill_metrics(
1655        &self,
1656        table_id: TableId,
1657        actor_id: ActorId,
1658    ) -> CdcBackfillMetrics {
1659        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1660        CdcBackfillMetrics {
1661            cdc_backfill_snapshot_read_row_count: self
1662                .cdc_backfill_snapshot_read_row_count
1663                .with_guarded_label_values(label_list),
1664            cdc_backfill_upstream_output_row_count: self
1665                .cdc_backfill_upstream_output_row_count
1666                .with_guarded_label_values(label_list),
1667        }
1668    }
1669
1670    pub fn new_over_window_metrics(
1671        &self,
1672        table_id: TableId,
1673        actor_id: ActorId,
1674        fragment_id: FragmentId,
1675    ) -> OverWindowMetrics {
1676        let label_list: &[&str; 3] = &[
1677            &table_id.to_string(),
1678            &actor_id.to_string(),
1679            &fragment_id.to_string(),
1680        ];
1681        OverWindowMetrics {
1682            over_window_cached_entry_count: self
1683                .over_window_cached_entry_count
1684                .with_guarded_label_values(label_list),
1685            over_window_cache_lookup_count: self
1686                .over_window_cache_lookup_count
1687                .with_guarded_label_values(label_list),
1688            over_window_cache_miss_count: self
1689                .over_window_cache_miss_count
1690                .with_guarded_label_values(label_list),
1691            over_window_range_cache_entry_count: self
1692                .over_window_range_cache_entry_count
1693                .with_guarded_label_values(label_list),
1694            over_window_range_cache_lookup_count: self
1695                .over_window_range_cache_lookup_count
1696                .with_guarded_label_values(label_list),
1697            over_window_range_cache_left_miss_count: self
1698                .over_window_range_cache_left_miss_count
1699                .with_guarded_label_values(label_list),
1700            over_window_range_cache_right_miss_count: self
1701                .over_window_range_cache_right_miss_count
1702                .with_guarded_label_values(label_list),
1703            over_window_accessed_entry_count: self
1704                .over_window_accessed_entry_count
1705                .with_guarded_label_values(label_list),
1706            over_window_compute_count: self
1707                .over_window_compute_count
1708                .with_guarded_label_values(label_list),
1709            over_window_same_output_count: self
1710                .over_window_same_output_count
1711                .with_guarded_label_values(label_list),
1712        }
1713    }
1714
1715    pub fn new_materialize_cache_metrics(
1716        &self,
1717        table_id: TableId,
1718        actor_id: ActorId,
1719        fragment_id: FragmentId,
1720    ) -> MaterializeCacheMetrics {
1721        let label_list: &[&str; 3] = &[
1722            &actor_id.to_string(),
1723            &table_id.to_string(),
1724            &fragment_id.to_string(),
1725        ];
1726        MaterializeCacheMetrics {
1727            materialize_cache_hit_count: self
1728                .materialize_cache_hit_count
1729                .with_guarded_label_values(label_list),
1730            materialize_data_exist_count: self
1731                .materialize_data_exist_count
1732                .with_guarded_label_values(label_list),
1733            materialize_cache_total_count: self
1734                .materialize_cache_total_count
1735                .with_guarded_label_values(label_list),
1736        }
1737    }
1738
1739    pub fn new_materialize_metrics(
1740        &self,
1741        table_id: TableId,
1742        actor_id: ActorId,
1743        fragment_id: FragmentId,
1744    ) -> MaterializeMetrics {
1745        let label_list: &[&str; 3] = &[
1746            &actor_id.to_string(),
1747            &table_id.to_string(),
1748            &fragment_id.to_string(),
1749        ];
1750        MaterializeMetrics {
1751            materialize_input_row_count: self
1752                .materialize_input_row_count
1753                .with_guarded_label_values(label_list),
1754            materialize_current_epoch: self
1755                .materialize_current_epoch
1756                .with_guarded_label_values(label_list),
1757        }
1758    }
1759
1760    pub fn new_state_table_metrics(
1761        &self,
1762        table_id: TableId,
1763        actor_id: ActorId,
1764        fragment_id: FragmentId,
1765    ) -> StateTableMetrics {
1766        let label_list: &[&str; 3] = &[
1767            &actor_id.to_string(),
1768            &fragment_id.to_string(),
1769            &table_id.to_string(),
1770        ];
1771        StateTableMetrics {
1772            iter_count: self
1773                .state_table_iter_count
1774                .with_guarded_label_values(label_list),
1775            get_count: self
1776                .state_table_get_count
1777                .with_guarded_label_values(label_list),
1778            iter_vnode_pruned_count: self
1779                .state_table_iter_vnode_pruned_count
1780                .with_guarded_label_values(label_list),
1781            get_vnode_pruned_count: self
1782                .state_table_get_vnode_pruned_count
1783                .with_guarded_label_values(label_list),
1784        }
1785    }
1786}
1787
1788pub(crate) struct ActorInputMetrics {
1789    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1790    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1791}
1792
1793/// Tokio metrics for actors
1794pub struct ActorMetrics {
1795    pub actor_scheduled_duration: LabelGuardedIntCounter,
1796    pub actor_scheduled_cnt: LabelGuardedIntCounter,
1797    pub actor_poll_duration: LabelGuardedIntCounter,
1798    pub actor_poll_cnt: LabelGuardedIntCounter,
1799    pub actor_idle_duration: LabelGuardedIntCounter,
1800    pub actor_idle_cnt: LabelGuardedIntCounter,
1801}
1802
1803pub struct SinkExecutorMetrics {
1804    pub sink_input_row_count: LabelGuardedIntCounter,
1805    pub sink_input_bytes: LabelGuardedIntCounter,
1806    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1807}
1808
1809pub struct MaterializeCacheMetrics {
1810    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1811    pub materialize_data_exist_count: LabelGuardedIntCounter,
1812    pub materialize_cache_total_count: LabelGuardedIntCounter,
1813}
1814
1815pub struct MaterializeMetrics {
1816    pub materialize_input_row_count: LabelGuardedIntCounter,
1817    pub materialize_current_epoch: LabelGuardedIntGauge,
1818}
1819
1820pub struct GroupTopNMetrics {
1821    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1822    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1823    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1824}
1825
1826pub struct LookupExecutorMetrics {
1827    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1828    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1829    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1830}
1831
1832pub struct HashAggMetrics {
1833    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1834    pub agg_total_lookup_count: LabelGuardedIntCounter,
1835    pub agg_cached_entry_count: LabelGuardedIntGauge,
1836    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1837    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1838    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1839    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1840    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1841    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1842}
1843
1844pub struct AggDistinctDedupMetrics {
1845    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1846    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1847    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1848}
1849
1850pub struct TemporalJoinMetrics {
1851    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1852    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1853    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1854}
1855
1856pub struct BackfillMetrics {
1857    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1858    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1859}
1860
1861#[derive(Clone)]
1862pub struct CdcBackfillMetrics {
1863    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1864    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1865}
1866
1867pub struct OverWindowMetrics {
1868    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1869    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1870    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1871    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1872    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1873    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1874    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1875    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1876    pub over_window_compute_count: LabelGuardedIntCounter,
1877    pub over_window_same_output_count: LabelGuardedIntCounter,
1878}
1879
1880pub struct StateTableMetrics {
1881    pub iter_count: LabelGuardedIntCounter,
1882    pub get_count: LabelGuardedIntCounter,
1883    pub iter_vnode_pruned_count: LabelGuardedIntCounter,
1884    pub get_vnode_pruned_count: LabelGuardedIntCounter,
1885}