risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33    register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45    pub level: MetricLevel,
46
47    // Executor metrics (disabled by default)
48    pub executor_row_count: RelabeledGuardedIntCounterVec<3>,
49
50    // Profiling Metrics:
51    // Aggregated per operator rather than per actor.
52    // These are purely in-memory, never collected by prometheus.
53    pub mem_stream_node_output_row_count: CountMap,
54    pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56    // Streaming actor metrics from tokio (disabled by default)
57    actor_execution_time: LabelGuardedGaugeVec<1>,
58    actor_scheduled_duration: LabelGuardedGaugeVec<1>,
59    actor_scheduled_cnt: LabelGuardedIntGaugeVec<1>,
60    actor_fast_poll_duration: LabelGuardedGaugeVec<1>,
61    actor_fast_poll_cnt: LabelGuardedIntGaugeVec<1>,
62    actor_slow_poll_duration: LabelGuardedGaugeVec<1>,
63    actor_slow_poll_cnt: LabelGuardedIntGaugeVec<1>,
64    actor_poll_duration: LabelGuardedGaugeVec<1>,
65    actor_poll_cnt: LabelGuardedIntGaugeVec<1>,
66    actor_idle_duration: LabelGuardedGaugeVec<1>,
67    actor_idle_cnt: LabelGuardedIntGaugeVec<1>,
68
69    // Streaming actor
70    pub actor_count: LabelGuardedIntGaugeVec<1>,
71    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec<3>,
72    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec<2>,
73    pub actor_current_epoch: RelabeledGuardedIntGaugeVec<2>,
74
75    // Source
76    pub source_output_row_count: LabelGuardedIntCounterVec<4>,
77    pub source_split_change_count: LabelGuardedIntCounterVec<4>,
78    pub source_backfill_row_count: LabelGuardedIntCounterVec<4>,
79
80    // Sink
81    sink_input_row_count: LabelGuardedIntCounterVec<3>,
82    sink_input_bytes: LabelGuardedIntCounterVec<3>,
83    sink_chunk_buffer_size: LabelGuardedIntGaugeVec<3>,
84
85    // Exchange (see also `compute::ExchangeServiceMetrics`)
86    pub exchange_frag_recv_size: LabelGuardedIntCounterVec<2>,
87
88    // Streaming Merge (We break out this metric from `barrier_align_duration` because
89    // the alignment happens on different levels)
90    pub merge_barrier_align_duration: RelabeledGuardedHistogramVec<2>,
91
92    // Backpressure
93    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec<3>,
94    actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,
95
96    // Streaming Join
97    pub join_lookup_miss_count: LabelGuardedIntCounterVec<4>,
98    pub join_lookup_total_count: LabelGuardedIntCounterVec<4>,
99    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec<4>,
100    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec<2>,
101    pub join_match_duration_ns: LabelGuardedIntCounterVec<3>,
102    pub join_cached_entry_count: LabelGuardedIntGaugeVec<3>,
103    pub join_matched_join_keys: RelabeledGuardedHistogramVec<3>,
104
105    // Streaming Join, Streaming Dynamic Filter and Streaming Union
106    pub barrier_align_duration: RelabeledGuardedIntCounterVec<4>,
107
108    // Streaming Aggregation
109    agg_lookup_miss_count: LabelGuardedIntCounterVec<3>,
110    agg_total_lookup_count: LabelGuardedIntCounterVec<3>,
111    agg_cached_entry_count: LabelGuardedIntGaugeVec<3>,
112    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec<3>,
113    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec<3>,
114    agg_dirty_groups_count: LabelGuardedIntGaugeVec<3>,
115    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec<3>,
116    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec<3>,
117    agg_distinct_total_cache_count: LabelGuardedIntCounterVec<3>,
118    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec<3>,
119    agg_state_cache_lookup_count: LabelGuardedIntCounterVec<3>,
120    agg_state_cache_miss_count: LabelGuardedIntCounterVec<3>,
121
122    // Streaming TopN
123    group_top_n_cache_miss_count: LabelGuardedIntCounterVec<3>,
124    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec<3>,
125    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec<3>,
126    // TODO(rc): why not just use the above three?
127    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec<3>,
128    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec<3>,
129    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec<3>,
130
131    // Lookup executor
132    lookup_cache_miss_count: LabelGuardedIntCounterVec<3>,
133    lookup_total_query_cache_count: LabelGuardedIntCounterVec<3>,
134    lookup_cached_entry_count: LabelGuardedIntGaugeVec<3>,
135
136    // temporal join
137    temporal_join_cache_miss_count: LabelGuardedIntCounterVec<3>,
138    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec<3>,
139    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec<3>,
140
141    // Backfill
142    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec<2>,
143    backfill_upstream_output_row_count: LabelGuardedIntCounterVec<2>,
144
145    // CDC Backfill
146    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec<2>,
147    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec<2>,
148
149    // Snapshot Backfill
150    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec<3>,
151
152    // Over Window
153    over_window_cached_entry_count: LabelGuardedIntGaugeVec<3>,
154    over_window_cache_lookup_count: LabelGuardedIntCounterVec<3>,
155    over_window_cache_miss_count: LabelGuardedIntCounterVec<3>,
156    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec<3>,
157    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec<3>,
158    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec<3>,
159    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec<3>,
160    over_window_accessed_entry_count: LabelGuardedIntCounterVec<3>,
161    over_window_compute_count: LabelGuardedIntCounterVec<3>,
162    over_window_same_output_count: LabelGuardedIntCounterVec<3>,
163
164    /// The duration from receipt of barrier to all actors collection.
165    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
166    /// to flow through the graph.
167    pub barrier_inflight_latency: Histogram,
168    /// The duration of sync to storage.
169    pub barrier_sync_latency: Histogram,
170    pub barrier_batch_size: Histogram,
171    /// The progress made by the earliest in-flight barriers in the local barrier manager.
172    pub barrier_manager_progress: IntCounter,
173
174    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec<4>,
175    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec<4>,
176    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec<4>,
177    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec<4>,
178    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec<5>,
179    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec<5>,
180    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec<4>,
181    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec<4>,
182    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec<4>,
183    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec<4>,
184
185    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec<5>,
186    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec<5>,
187    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec<4>,
188    pub sync_kv_log_store_state: LabelGuardedIntCounterVec<5>,
189    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec<4>,
190    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec<4>,
191    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec<4>,
192    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec<4>,
193    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec<4>,
194    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec<4>,
195    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec<4>,
196
197    // Memory management
198    pub lru_runtime_loop_count: IntCounter,
199    pub lru_latest_sequence: IntGauge,
200    pub lru_watermark_sequence: IntGauge,
201    pub lru_eviction_policy: IntGauge,
202    pub jemalloc_allocated_bytes: IntGauge,
203    pub jemalloc_active_bytes: IntGauge,
204    pub jemalloc_resident_bytes: IntGauge,
205    pub jemalloc_metadata_bytes: IntGauge,
206    pub jvm_allocated_bytes: IntGauge,
207    pub jvm_active_bytes: IntGauge,
208    pub stream_memory_usage: RelabeledGuardedIntGaugeVec<3>,
209
210    // Materialized view
211    materialize_cache_hit_count: RelabeledGuardedIntCounterVec<3>,
212    materialize_cache_total_count: RelabeledGuardedIntCounterVec<3>,
213    materialize_input_row_count: RelabeledGuardedIntCounterVec<3>,
214    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec<3>,
215}
216
217pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
218
219pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
220    GLOBAL_STREAMING_METRICS
221        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
222        .clone()
223}
224
225impl StreamingMetrics {
226    fn new(registry: &Registry, level: MetricLevel) -> Self {
227        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
228            "stream_executor_row_count",
229            "Total number of rows that have been output from each executor",
230            &["actor_id", "fragment_id", "executor_identity"],
231            registry
232        )
233        .unwrap()
234        .relabel_debug_1(level);
235
236        let stream_node_output_row_count = CountMap::new();
237        let stream_node_output_blocking_duration_ns = CountMap::new();
238
239        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
240            "stream_source_output_rows_counts",
241            "Total number of rows that have been output from source",
242            &["source_id", "source_name", "actor_id", "fragment_id"],
243            registry
244        )
245        .unwrap();
246
247        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
248            "stream_source_split_change_event_count",
249            "Total number of split change events that have been operated by source",
250            &["source_id", "source_name", "actor_id", "fragment_id"],
251            registry
252        )
253        .unwrap();
254
255        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
256            "stream_source_backfill_rows_counts",
257            "Total number of rows that have been backfilled for source",
258            &["source_id", "source_name", "actor_id", "fragment_id"],
259            registry
260        )
261        .unwrap();
262
263        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
264            "stream_sink_input_row_count",
265            "Total number of rows streamed into sink executors",
266            &["sink_id", "actor_id", "fragment_id"],
267            registry
268        )
269        .unwrap();
270
271        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
272            "stream_sink_input_bytes",
273            "Total size of chunks streamed into sink executors",
274            &["sink_id", "actor_id", "fragment_id"],
275            registry
276        )
277        .unwrap();
278
279        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
280            "stream_mview_input_row_count",
281            "Total number of rows streamed into materialize executors",
282            &["actor_id", "table_id", "fragment_id"],
283            registry
284        )
285        .unwrap()
286        .relabel_debug_1(level);
287
288        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
289            "stream_mview_current_epoch",
290            "The current epoch of the materialized executor",
291            &["actor_id", "table_id", "fragment_id"],
292            registry
293        )
294        .unwrap()
295        .relabel_debug_1(level);
296
297        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
298            "stream_sink_chunk_buffer_size",
299            "Total size of chunks buffered in a barrier",
300            &["sink_id", "actor_id", "fragment_id"],
301            registry
302        )
303        .unwrap();
304
305        let actor_execution_time = register_guarded_gauge_vec_with_registry!(
306            "stream_actor_actor_execution_time",
307            "Total execution time (s) of an actor",
308            &["actor_id"],
309            registry
310        )
311        .unwrap();
312
313        let actor_output_buffer_blocking_duration_ns =
314            register_guarded_int_counter_vec_with_registry!(
315                "stream_actor_output_buffer_blocking_duration_ns",
316                "Total blocking duration (ns) of output buffer",
317                &["actor_id", "fragment_id", "downstream_fragment_id"],
318                registry
319            )
320            .unwrap()
321            // mask the first label `actor_id` if the level is less verbose than `Debug`
322            .relabel_debug_1(level);
323
324        let actor_input_buffer_blocking_duration_ns =
325            register_guarded_int_counter_vec_with_registry!(
326                "stream_actor_input_buffer_blocking_duration_ns",
327                "Total blocking duration (ns) of input buffer",
328                &["actor_id", "fragment_id", "upstream_fragment_id"],
329                registry
330            )
331            .unwrap();
332
333        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
334            "stream_exchange_frag_recv_size",
335            "Total size of messages that have been received from upstream Fragment",
336            &["up_fragment_id", "down_fragment_id"],
337            registry
338        )
339        .unwrap();
340
341        let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
342            "stream_actor_fast_poll_duration",
343            "tokio's metrics",
344            &["actor_id"],
345            registry
346        )
347        .unwrap();
348
349        let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
350            "stream_actor_fast_poll_cnt",
351            "tokio's metrics",
352            &["actor_id"],
353            registry
354        )
355        .unwrap();
356
357        let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
358            "stream_actor_slow_poll_duration",
359            "tokio's metrics",
360            &["actor_id"],
361            registry
362        )
363        .unwrap();
364
365        let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
366            "stream_actor_slow_poll_cnt",
367            "tokio's metrics",
368            &["actor_id"],
369            registry
370        )
371        .unwrap();
372
373        let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
374            "stream_actor_poll_duration",
375            "tokio's metrics",
376            &["actor_id"],
377            registry
378        )
379        .unwrap();
380
381        let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
382            "stream_actor_poll_cnt",
383            "tokio's metrics",
384            &["actor_id"],
385            registry
386        )
387        .unwrap();
388
389        let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
390            "stream_actor_scheduled_duration",
391            "tokio's metrics",
392            &["actor_id"],
393            registry
394        )
395        .unwrap();
396
397        let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
398            "stream_actor_scheduled_cnt",
399            "tokio's metrics",
400            &["actor_id"],
401            registry
402        )
403        .unwrap();
404
405        let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
406            "stream_actor_idle_duration",
407            "tokio's metrics",
408            &["actor_id"],
409            registry
410        )
411        .unwrap();
412
413        let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
414            "stream_actor_idle_cnt",
415            "tokio's metrics",
416            &["actor_id"],
417            registry
418        )
419        .unwrap();
420
421        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
422            "stream_actor_in_record_cnt",
423            "Total number of rows actor received",
424            &["actor_id", "fragment_id", "upstream_fragment_id"],
425            registry
426        )
427        .unwrap()
428        .relabel_debug_1(level);
429
430        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
431            "stream_actor_out_record_cnt",
432            "Total number of rows actor sent",
433            &["actor_id", "fragment_id"],
434            registry
435        )
436        .unwrap()
437        .relabel_debug_1(level);
438
439        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
440            "stream_actor_current_epoch",
441            "Current epoch of actor",
442            &["actor_id", "fragment_id"],
443            registry
444        )
445        .unwrap()
446        .relabel_debug_1(level);
447
448        let actor_count = register_guarded_int_gauge_vec_with_registry!(
449            "stream_actor_count",
450            "Total number of actors (parallelism)",
451            &["fragment_id"],
452            registry
453        )
454        .unwrap();
455
456        let opts = histogram_opts!(
457            "stream_merge_barrier_align_duration",
458            "Duration of merge align barrier",
459            exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
460        );
461        let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
462            opts,
463            &["actor_id", "fragment_id"],
464            registry
465        )
466        .unwrap()
467        .relabel_debug_1(level);
468
469        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
470            "stream_join_lookup_miss_count",
471            "Join executor lookup miss duration",
472            &["side", "join_table_id", "actor_id", "fragment_id"],
473            registry
474        )
475        .unwrap();
476
477        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
478            "stream_join_lookup_total_count",
479            "Join executor lookup total operation",
480            &["side", "join_table_id", "actor_id", "fragment_id"],
481            registry
482        )
483        .unwrap();
484
485        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
486            "stream_join_insert_cache_miss_count",
487            "Join executor cache miss when insert operation",
488            &["side", "join_table_id", "actor_id", "fragment_id"],
489            registry
490        )
491        .unwrap();
492
493        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
494            "stream_join_actor_input_waiting_duration_ns",
495            "Total waiting duration (ns) of input buffer of join actor",
496            &["actor_id", "fragment_id"],
497            registry
498        )
499        .unwrap();
500
501        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
502            "stream_join_match_duration_ns",
503            "Matching duration for each side",
504            &["actor_id", "fragment_id", "side"],
505            registry
506        )
507        .unwrap();
508
509        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
510            "stream_barrier_align_duration_ns",
511            "Duration of join align barrier",
512            &["actor_id", "fragment_id", "wait_side", "executor"],
513            registry
514        )
515        .unwrap()
516        .relabel_debug_1(level);
517
518        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
519            "stream_join_cached_entry_count",
520            "Number of cached entries in streaming join operators",
521            &["actor_id", "fragment_id", "side"],
522            registry
523        )
524        .unwrap();
525
526        let join_matched_join_keys_opts = histogram_opts!(
527            "stream_join_matched_join_keys",
528            "The number of keys matched in the opposite side",
529            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
530        );
531
532        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
533            join_matched_join_keys_opts,
534            &["actor_id", "fragment_id", "table_id"],
535            registry
536        )
537        .unwrap()
538        .relabel_debug_1(level);
539
540        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
541            "stream_agg_lookup_miss_count",
542            "Aggregation executor lookup miss duration",
543            &["table_id", "actor_id", "fragment_id"],
544            registry
545        )
546        .unwrap();
547
548        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
549            "stream_agg_lookup_total_count",
550            "Aggregation executor lookup total operation",
551            &["table_id", "actor_id", "fragment_id"],
552            registry
553        )
554        .unwrap();
555
556        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
557            "stream_agg_distinct_cache_miss_count",
558            "Aggregation executor dinsinct miss duration",
559            &["table_id", "actor_id", "fragment_id"],
560            registry
561        )
562        .unwrap();
563
564        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
565            "stream_agg_distinct_total_cache_count",
566            "Aggregation executor distinct total operation",
567            &["table_id", "actor_id", "fragment_id"],
568            registry
569        )
570        .unwrap();
571
572        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
573            "stream_agg_distinct_cached_entry_count",
574            "Total entry counts in distinct aggregation executor cache",
575            &["table_id", "actor_id", "fragment_id"],
576            registry
577        )
578        .unwrap();
579
580        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
581            "stream_agg_dirty_groups_count",
582            "Total dirty group counts in aggregation executor",
583            &["table_id", "actor_id", "fragment_id"],
584            registry
585        )
586        .unwrap();
587
588        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
589            "stream_agg_dirty_groups_heap_size",
590            "Total dirty group heap size in aggregation executor",
591            &["table_id", "actor_id", "fragment_id"],
592            registry
593        )
594        .unwrap();
595
596        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
597            "stream_agg_state_cache_lookup_count",
598            "Aggregation executor state cache lookup count",
599            &["table_id", "actor_id", "fragment_id"],
600            registry
601        )
602        .unwrap();
603
604        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
605            "stream_agg_state_cache_miss_count",
606            "Aggregation executor state cache miss count",
607            &["table_id", "actor_id", "fragment_id"],
608            registry
609        )
610        .unwrap();
611
612        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
613            "stream_group_top_n_cache_miss_count",
614            "Group top n executor cache miss count",
615            &["table_id", "actor_id", "fragment_id"],
616            registry
617        )
618        .unwrap();
619
620        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
621            "stream_group_top_n_total_query_cache_count",
622            "Group top n executor query cache total count",
623            &["table_id", "actor_id", "fragment_id"],
624            registry
625        )
626        .unwrap();
627
628        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
629            "stream_group_top_n_cached_entry_count",
630            "Total entry counts in group top n executor cache",
631            &["table_id", "actor_id", "fragment_id"],
632            registry
633        )
634        .unwrap();
635
636        let group_top_n_appendonly_cache_miss_count =
637            register_guarded_int_counter_vec_with_registry!(
638                "stream_group_top_n_appendonly_cache_miss_count",
639                "Group top n appendonly executor cache miss count",
640                &["table_id", "actor_id", "fragment_id"],
641                registry
642            )
643            .unwrap();
644
645        let group_top_n_appendonly_total_query_cache_count =
646            register_guarded_int_counter_vec_with_registry!(
647                "stream_group_top_n_appendonly_total_query_cache_count",
648                "Group top n appendonly executor total cache count",
649                &["table_id", "actor_id", "fragment_id"],
650                registry
651            )
652            .unwrap();
653
654        let group_top_n_appendonly_cached_entry_count =
655            register_guarded_int_gauge_vec_with_registry!(
656                "stream_group_top_n_appendonly_cached_entry_count",
657                "Total entry counts in group top n appendonly executor cache",
658                &["table_id", "actor_id", "fragment_id"],
659                registry
660            )
661            .unwrap();
662
663        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
664            "stream_lookup_cache_miss_count",
665            "Lookup executor cache miss count",
666            &["table_id", "actor_id", "fragment_id"],
667            registry
668        )
669        .unwrap();
670
671        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
672            "stream_lookup_total_query_cache_count",
673            "Lookup executor query cache total count",
674            &["table_id", "actor_id", "fragment_id"],
675            registry
676        )
677        .unwrap();
678
679        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
680            "stream_lookup_cached_entry_count",
681            "Total entry counts in lookup executor cache",
682            &["table_id", "actor_id", "fragment_id"],
683            registry
684        )
685        .unwrap();
686
687        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
688            "stream_temporal_join_cache_miss_count",
689            "Temporal join executor cache miss count",
690            &["table_id", "actor_id", "fragment_id"],
691            registry
692        )
693        .unwrap();
694
695        let temporal_join_total_query_cache_count =
696            register_guarded_int_counter_vec_with_registry!(
697                "stream_temporal_join_total_query_cache_count",
698                "Temporal join executor query cache total count",
699                &["table_id", "actor_id", "fragment_id"],
700                registry
701            )
702            .unwrap();
703
704        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
705            "stream_temporal_join_cached_entry_count",
706            "Total entry count in temporal join executor cache",
707            &["table_id", "actor_id", "fragment_id"],
708            registry
709        )
710        .unwrap();
711
712        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
713            "stream_agg_cached_entry_count",
714            "Number of cached keys in streaming aggregation operators",
715            &["table_id", "actor_id", "fragment_id"],
716            registry
717        )
718        .unwrap();
719
720        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
721            "stream_agg_chunk_lookup_miss_count",
722            "Aggregation executor chunk-level lookup miss duration",
723            &["table_id", "actor_id", "fragment_id"],
724            registry
725        )
726        .unwrap();
727
728        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
729            "stream_agg_chunk_lookup_total_count",
730            "Aggregation executor chunk-level lookup total operation",
731            &["table_id", "actor_id", "fragment_id"],
732            registry
733        )
734        .unwrap();
735
736        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
737            "stream_backfill_snapshot_read_row_count",
738            "Total number of rows that have been read from the backfill snapshot",
739            &["table_id", "actor_id"],
740            registry
741        )
742        .unwrap();
743
744        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
745            "stream_backfill_upstream_output_row_count",
746            "Total number of rows that have been output from the backfill upstream",
747            &["table_id", "actor_id"],
748            registry
749        )
750        .unwrap();
751
752        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
753            "stream_cdc_backfill_snapshot_read_row_count",
754            "Total number of rows that have been read from the cdc_backfill snapshot",
755            &["table_id", "actor_id"],
756            registry
757        )
758        .unwrap();
759
760        let cdc_backfill_upstream_output_row_count =
761            register_guarded_int_counter_vec_with_registry!(
762                "stream_cdc_backfill_upstream_output_row_count",
763                "Total number of rows that have been output from the cdc_backfill upstream",
764                &["table_id", "actor_id"],
765                registry
766            )
767            .unwrap();
768
769        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
770            "stream_snapshot_backfill_consume_snapshot_row_count",
771            "Total number of rows that have been output from snapshot backfill",
772            &["table_id", "actor_id", "stage"],
773            registry
774        )
775        .unwrap();
776
777        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
778            "stream_over_window_cached_entry_count",
779            "Total entry (partition) count in over window executor cache",
780            &["table_id", "actor_id", "fragment_id"],
781            registry
782        )
783        .unwrap();
784
785        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
786            "stream_over_window_cache_lookup_count",
787            "Over window executor cache lookup count",
788            &["table_id", "actor_id", "fragment_id"],
789            registry
790        )
791        .unwrap();
792
793        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
794            "stream_over_window_cache_miss_count",
795            "Over window executor cache miss count",
796            &["table_id", "actor_id", "fragment_id"],
797            registry
798        )
799        .unwrap();
800
801        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
802            "stream_over_window_range_cache_entry_count",
803            "Over window partition range cache entry count",
804            &["table_id", "actor_id", "fragment_id"],
805            registry,
806        )
807        .unwrap();
808
809        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
810            "stream_over_window_range_cache_lookup_count",
811            "Over window partition range cache lookup count",
812            &["table_id", "actor_id", "fragment_id"],
813            registry
814        )
815        .unwrap();
816
817        let over_window_range_cache_left_miss_count =
818            register_guarded_int_counter_vec_with_registry!(
819                "stream_over_window_range_cache_left_miss_count",
820                "Over window partition range cache left miss count",
821                &["table_id", "actor_id", "fragment_id"],
822                registry
823            )
824            .unwrap();
825
826        let over_window_range_cache_right_miss_count =
827            register_guarded_int_counter_vec_with_registry!(
828                "stream_over_window_range_cache_right_miss_count",
829                "Over window partition range cache right miss count",
830                &["table_id", "actor_id", "fragment_id"],
831                registry
832            )
833            .unwrap();
834
835        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
836            "stream_over_window_accessed_entry_count",
837            "Over window accessed entry count",
838            &["table_id", "actor_id", "fragment_id"],
839            registry
840        )
841        .unwrap();
842
843        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
844            "stream_over_window_compute_count",
845            "Over window compute count",
846            &["table_id", "actor_id", "fragment_id"],
847            registry
848        )
849        .unwrap();
850
851        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
852            "stream_over_window_same_output_count",
853            "Over window same output count",
854            &["table_id", "actor_id", "fragment_id"],
855            registry
856        )
857        .unwrap();
858
859        let opts = histogram_opts!(
860            "stream_barrier_inflight_duration_seconds",
861            "barrier_inflight_latency",
862            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
863        );
864        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
865
866        let opts = histogram_opts!(
867            "stream_barrier_sync_storage_duration_seconds",
868            "barrier_sync_latency",
869            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
870        );
871        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
872
873        let opts = histogram_opts!(
874            "stream_barrier_batch_size",
875            "barrier_batch_size",
876            exponential_buckets(1.0, 2.0, 8).unwrap()
877        );
878        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
879
880        let barrier_manager_progress = register_int_counter_with_registry!(
881            "stream_barrier_manager_progress",
882            "The number of actors that have processed the earliest in-flight barriers",
883            registry
884        )
885        .unwrap();
886
887        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
888            "sync_kv_log_store_wait_next_poll_ns",
889            "Total duration (ns) of waiting for next poll",
890            &["actor_id", "target", "fragment_id", "relation"],
891            registry
892        )
893        .unwrap();
894
895        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
896            "sync_kv_log_store_read_count",
897            "read row count throughput of sync_kv log store",
898            &["type", "actor_id", "target", "fragment_id", "relation"],
899            registry
900        )
901        .unwrap();
902
903        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
904            "sync_kv_log_store_read_size",
905            "read size throughput of sync_kv log store",
906            &["type", "actor_id", "target", "fragment_id", "relation"],
907            registry
908        )
909        .unwrap();
910
911        let sync_kv_log_store_write_pause_duration_ns =
912            register_guarded_int_counter_vec_with_registry!(
913                "sync_kv_log_store_write_pause_duration_ns",
914                "Duration (ns) of sync_kv log store write pause",
915                &["actor_id", "target", "fragment_id", "relation"],
916                registry
917            )
918            .unwrap();
919
920        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
921            "sync_kv_log_store_state",
922            "clean/unclean state transition for sync_kv log store",
923            &["state", "actor_id", "target", "fragment_id", "relation"],
924            registry
925        )
926        .unwrap();
927
928        let sync_kv_log_store_storage_write_count =
929            register_guarded_int_counter_vec_with_registry!(
930                "sync_kv_log_store_storage_write_count",
931                "Write row count throughput of sync_kv log store",
932                &["actor_id", "target", "fragment_id", "relation"],
933                registry
934            )
935            .unwrap();
936
937        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
938            "sync_kv_log_store_storage_write_size",
939            "Write size throughput of sync_kv log store",
940            &["actor_id", "target", "fragment_id", "relation"],
941            registry
942        )
943        .unwrap();
944
945        let sync_kv_log_store_buffer_unconsumed_item_count =
946            register_guarded_int_gauge_vec_with_registry!(
947                "sync_kv_log_store_buffer_unconsumed_item_count",
948                "Number of Unconsumed Item in buffer",
949                &["actor_id", "target", "fragment_id", "relation"],
950                registry
951            )
952            .unwrap();
953
954        let sync_kv_log_store_buffer_unconsumed_row_count =
955            register_guarded_int_gauge_vec_with_registry!(
956                "sync_kv_log_store_buffer_unconsumed_row_count",
957                "Number of Unconsumed Row in buffer",
958                &["actor_id", "target", "fragment_id", "relation"],
959                registry
960            )
961            .unwrap();
962
963        let sync_kv_log_store_buffer_unconsumed_epoch_count =
964            register_guarded_int_gauge_vec_with_registry!(
965                "sync_kv_log_store_buffer_unconsumed_epoch_count",
966                "Number of Unconsumed Epoch in buffer",
967                &["actor_id", "target", "fragment_id", "relation"],
968                registry
969            )
970            .unwrap();
971
972        let sync_kv_log_store_buffer_unconsumed_min_epoch =
973            register_guarded_int_gauge_vec_with_registry!(
974                "sync_kv_log_store_buffer_unconsumed_min_epoch",
975                "Number of Unconsumed Epoch in buffer",
976                &["actor_id", "target", "fragment_id", "relation"],
977                registry
978            )
979            .unwrap();
980
981        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
982            "kv_log_store_storage_write_count",
983            "Write row count throughput of kv log store",
984            &["actor_id", "connector", "sink_id", "sink_name"],
985            registry
986        )
987        .unwrap();
988
989        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
990            "kv_log_store_storage_write_size",
991            "Write size throughput of kv log store",
992            &["actor_id", "connector", "sink_id", "sink_name"],
993            registry
994        )
995        .unwrap();
996
997        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
998            "kv_log_store_storage_read_count",
999            "Write row count throughput of kv log store",
1000            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1001            registry
1002        )
1003        .unwrap();
1004
1005        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1006            "kv_log_store_storage_read_size",
1007            "Write size throughput of kv log store",
1008            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1009            registry
1010        )
1011        .unwrap();
1012
1013        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1014            "kv_log_store_rewind_count",
1015            "Kv log store rewind rate",
1016            &["actor_id", "connector", "sink_id", "sink_name"],
1017            registry
1018        )
1019        .unwrap();
1020
1021        let kv_log_store_rewind_delay_opts = {
1022            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1023            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1024                - REWIND_BASE_DELAY.as_secs_f64().log2())
1025            .ceil() as usize;
1026            let buckets = exponential_buckets(
1027                REWIND_BASE_DELAY.as_secs_f64(),
1028                REWIND_BACKOFF_FACTOR as _,
1029                bucket_count,
1030            )
1031            .unwrap();
1032            histogram_opts!(
1033                "kv_log_store_rewind_delay",
1034                "Kv log store rewind delay",
1035                buckets,
1036            )
1037        };
1038
1039        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1040            kv_log_store_rewind_delay_opts,
1041            &["actor_id", "connector", "sink_id", "sink_name"],
1042            registry
1043        )
1044        .unwrap();
1045
1046        let kv_log_store_buffer_unconsumed_item_count =
1047            register_guarded_int_gauge_vec_with_registry!(
1048                "kv_log_store_buffer_unconsumed_item_count",
1049                "Number of Unconsumed Item in buffer",
1050                &["actor_id", "connector", "sink_id", "sink_name"],
1051                registry
1052            )
1053            .unwrap();
1054
1055        let kv_log_store_buffer_unconsumed_row_count =
1056            register_guarded_int_gauge_vec_with_registry!(
1057                "kv_log_store_buffer_unconsumed_row_count",
1058                "Number of Unconsumed Row in buffer",
1059                &["actor_id", "connector", "sink_id", "sink_name"],
1060                registry
1061            )
1062            .unwrap();
1063
1064        let kv_log_store_buffer_unconsumed_epoch_count =
1065            register_guarded_int_gauge_vec_with_registry!(
1066                "kv_log_store_buffer_unconsumed_epoch_count",
1067                "Number of Unconsumed Epoch in buffer",
1068                &["actor_id", "connector", "sink_id", "sink_name"],
1069                registry
1070            )
1071            .unwrap();
1072
1073        let kv_log_store_buffer_unconsumed_min_epoch =
1074            register_guarded_int_gauge_vec_with_registry!(
1075                "kv_log_store_buffer_unconsumed_min_epoch",
1076                "Number of Unconsumed Epoch in buffer",
1077                &["actor_id", "connector", "sink_id", "sink_name"],
1078                registry
1079            )
1080            .unwrap();
1081
1082        let lru_runtime_loop_count = register_int_counter_with_registry!(
1083            "lru_runtime_loop_count",
1084            "The counts of the eviction loop in LRU manager per second",
1085            registry
1086        )
1087        .unwrap();
1088
1089        let lru_latest_sequence = register_int_gauge_with_registry!(
1090            "lru_latest_sequence",
1091            "Current LRU global sequence",
1092            registry,
1093        )
1094        .unwrap();
1095
1096        let lru_watermark_sequence = register_int_gauge_with_registry!(
1097            "lru_watermark_sequence",
1098            "Current LRU watermark sequence",
1099            registry,
1100        )
1101        .unwrap();
1102
1103        let lru_eviction_policy = register_int_gauge_with_registry!(
1104            "lru_eviction_policy",
1105            "Current LRU eviction policy",
1106            registry,
1107        )
1108        .unwrap();
1109
1110        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1111            "jemalloc_allocated_bytes",
1112            "The allocated memory jemalloc, got from jemalloc_ctl",
1113            registry
1114        )
1115        .unwrap();
1116
1117        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1118            "jemalloc_active_bytes",
1119            "The active memory jemalloc, got from jemalloc_ctl",
1120            registry
1121        )
1122        .unwrap();
1123
1124        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1125            "jemalloc_resident_bytes",
1126            "The active memory jemalloc, got from jemalloc_ctl",
1127            registry
1128        )
1129        .unwrap();
1130
1131        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1132            "jemalloc_metadata_bytes",
1133            "The active memory jemalloc, got from jemalloc_ctl",
1134            registry
1135        )
1136        .unwrap();
1137
1138        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1139            "jvm_allocated_bytes",
1140            "The allocated jvm memory",
1141            registry
1142        )
1143        .unwrap();
1144
1145        let jvm_active_bytes = register_int_gauge_with_registry!(
1146            "jvm_active_bytes",
1147            "The active jvm memory",
1148            registry
1149        )
1150        .unwrap();
1151
1152        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1153            "stream_materialize_cache_hit_count",
1154            "Materialize executor cache hit count",
1155            &["actor_id", "table_id", "fragment_id"],
1156            registry
1157        )
1158        .unwrap()
1159        .relabel_debug_1(level);
1160
1161        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1162            "stream_materialize_cache_total_count",
1163            "Materialize executor cache total operation",
1164            &["actor_id", "table_id", "fragment_id"],
1165            registry
1166        )
1167        .unwrap()
1168        .relabel_debug_1(level);
1169
1170        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1171            "stream_memory_usage",
1172            "Memory usage for stream executors",
1173            &["actor_id", "table_id", "desc"],
1174            registry
1175        )
1176        .unwrap()
1177        .relabel_debug_1(level);
1178
1179        Self {
1180            level,
1181            executor_row_count,
1182            mem_stream_node_output_row_count: stream_node_output_row_count,
1183            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1184            actor_execution_time,
1185            actor_scheduled_duration,
1186            actor_scheduled_cnt,
1187            actor_fast_poll_duration,
1188            actor_fast_poll_cnt,
1189            actor_slow_poll_duration,
1190            actor_slow_poll_cnt,
1191            actor_poll_duration,
1192            actor_poll_cnt,
1193            actor_idle_duration,
1194            actor_idle_cnt,
1195            actor_count,
1196            actor_in_record_cnt,
1197            actor_out_record_cnt,
1198            actor_current_epoch,
1199            source_output_row_count,
1200            source_split_change_count,
1201            source_backfill_row_count,
1202            sink_input_row_count,
1203            sink_input_bytes,
1204            sink_chunk_buffer_size,
1205            exchange_frag_recv_size,
1206            merge_barrier_align_duration,
1207            actor_output_buffer_blocking_duration_ns,
1208            actor_input_buffer_blocking_duration_ns,
1209            join_lookup_miss_count,
1210            join_lookup_total_count,
1211            join_insert_cache_miss_count,
1212            join_actor_input_waiting_duration_ns,
1213            join_match_duration_ns,
1214            join_cached_entry_count,
1215            join_matched_join_keys,
1216            barrier_align_duration,
1217            agg_lookup_miss_count,
1218            agg_total_lookup_count,
1219            agg_cached_entry_count,
1220            agg_chunk_lookup_miss_count,
1221            agg_chunk_total_lookup_count,
1222            agg_dirty_groups_count,
1223            agg_dirty_groups_heap_size,
1224            agg_distinct_cache_miss_count,
1225            agg_distinct_total_cache_count,
1226            agg_distinct_cached_entry_count,
1227            agg_state_cache_lookup_count,
1228            agg_state_cache_miss_count,
1229            group_top_n_cache_miss_count,
1230            group_top_n_total_query_cache_count,
1231            group_top_n_cached_entry_count,
1232            group_top_n_appendonly_cache_miss_count,
1233            group_top_n_appendonly_total_query_cache_count,
1234            group_top_n_appendonly_cached_entry_count,
1235            lookup_cache_miss_count,
1236            lookup_total_query_cache_count,
1237            lookup_cached_entry_count,
1238            temporal_join_cache_miss_count,
1239            temporal_join_total_query_cache_count,
1240            temporal_join_cached_entry_count,
1241            backfill_snapshot_read_row_count,
1242            backfill_upstream_output_row_count,
1243            cdc_backfill_snapshot_read_row_count,
1244            cdc_backfill_upstream_output_row_count,
1245            snapshot_backfill_consume_row_count,
1246            over_window_cached_entry_count,
1247            over_window_cache_lookup_count,
1248            over_window_cache_miss_count,
1249            over_window_range_cache_entry_count,
1250            over_window_range_cache_lookup_count,
1251            over_window_range_cache_left_miss_count,
1252            over_window_range_cache_right_miss_count,
1253            over_window_accessed_entry_count,
1254            over_window_compute_count,
1255            over_window_same_output_count,
1256            barrier_inflight_latency,
1257            barrier_sync_latency,
1258            barrier_batch_size,
1259            barrier_manager_progress,
1260            kv_log_store_storage_write_count,
1261            kv_log_store_storage_write_size,
1262            kv_log_store_rewind_count,
1263            kv_log_store_rewind_delay,
1264            kv_log_store_storage_read_count,
1265            kv_log_store_storage_read_size,
1266            kv_log_store_buffer_unconsumed_item_count,
1267            kv_log_store_buffer_unconsumed_row_count,
1268            kv_log_store_buffer_unconsumed_epoch_count,
1269            kv_log_store_buffer_unconsumed_min_epoch,
1270            sync_kv_log_store_read_count,
1271            sync_kv_log_store_read_size,
1272            sync_kv_log_store_write_pause_duration_ns,
1273            sync_kv_log_store_state,
1274            sync_kv_log_store_wait_next_poll_ns,
1275            sync_kv_log_store_storage_write_count,
1276            sync_kv_log_store_storage_write_size,
1277            sync_kv_log_store_buffer_unconsumed_item_count,
1278            sync_kv_log_store_buffer_unconsumed_row_count,
1279            sync_kv_log_store_buffer_unconsumed_epoch_count,
1280            sync_kv_log_store_buffer_unconsumed_min_epoch,
1281            lru_runtime_loop_count,
1282            lru_latest_sequence,
1283            lru_watermark_sequence,
1284            lru_eviction_policy,
1285            jemalloc_allocated_bytes,
1286            jemalloc_active_bytes,
1287            jemalloc_resident_bytes,
1288            jemalloc_metadata_bytes,
1289            jvm_allocated_bytes,
1290            jvm_active_bytes,
1291            stream_memory_usage,
1292            materialize_cache_hit_count,
1293            materialize_cache_total_count,
1294            materialize_input_row_count,
1295            materialize_current_epoch,
1296        }
1297    }
1298
1299    /// Create a new `StreamingMetrics` instance used in tests or other places.
1300    pub fn unused() -> Self {
1301        global_streaming_metrics(MetricLevel::Disabled)
1302    }
1303
1304    pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1305        let label_list: &[&str; 1] = &[&actor_id.to_string()];
1306        let actor_execution_time = self
1307            .actor_execution_time
1308            .with_guarded_label_values(label_list);
1309        let actor_scheduled_duration = self
1310            .actor_scheduled_duration
1311            .with_guarded_label_values(label_list);
1312        let actor_scheduled_cnt = self
1313            .actor_scheduled_cnt
1314            .with_guarded_label_values(label_list);
1315        let actor_fast_poll_duration = self
1316            .actor_fast_poll_duration
1317            .with_guarded_label_values(label_list);
1318        let actor_fast_poll_cnt = self
1319            .actor_fast_poll_cnt
1320            .with_guarded_label_values(label_list);
1321        let actor_slow_poll_duration = self
1322            .actor_slow_poll_duration
1323            .with_guarded_label_values(label_list);
1324        let actor_slow_poll_cnt = self
1325            .actor_slow_poll_cnt
1326            .with_guarded_label_values(label_list);
1327        let actor_poll_duration = self
1328            .actor_poll_duration
1329            .with_guarded_label_values(label_list);
1330        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1331        let actor_idle_duration = self
1332            .actor_idle_duration
1333            .with_guarded_label_values(label_list);
1334        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1335        ActorMetrics {
1336            actor_execution_time,
1337            actor_scheduled_duration,
1338            actor_scheduled_cnt,
1339            actor_fast_poll_duration,
1340            actor_fast_poll_cnt,
1341            actor_slow_poll_duration,
1342            actor_slow_poll_cnt,
1343            actor_poll_duration,
1344            actor_poll_cnt,
1345            actor_idle_duration,
1346            actor_idle_cnt,
1347        }
1348    }
1349
1350    pub(crate) fn new_actor_input_metrics(
1351        &self,
1352        actor_id: ActorId,
1353        fragment_id: FragmentId,
1354        upstream_fragment_id: FragmentId,
1355    ) -> ActorInputMetrics {
1356        let actor_id_str = actor_id.to_string();
1357        let fragment_id_str = fragment_id.to_string();
1358        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1359        ActorInputMetrics {
1360            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1361                &actor_id_str,
1362                &fragment_id_str,
1363                &upstream_fragment_id_str,
1364            ]),
1365            actor_input_buffer_blocking_duration_ns: self
1366                .actor_input_buffer_blocking_duration_ns
1367                .with_guarded_label_values(&[
1368                    &actor_id_str,
1369                    &fragment_id_str,
1370                    &upstream_fragment_id_str,
1371                ]),
1372        }
1373    }
1374
1375    pub fn new_sink_exec_metrics(
1376        &self,
1377        id: SinkId,
1378        actor_id: ActorId,
1379        fragment_id: FragmentId,
1380    ) -> SinkExecutorMetrics {
1381        let label_list: &[&str; 3] = &[
1382            &id.to_string(),
1383            &actor_id.to_string(),
1384            &fragment_id.to_string(),
1385        ];
1386        SinkExecutorMetrics {
1387            sink_input_row_count: self
1388                .sink_input_row_count
1389                .with_guarded_label_values(label_list),
1390            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1391            sink_chunk_buffer_size: self
1392                .sink_chunk_buffer_size
1393                .with_guarded_label_values(label_list),
1394        }
1395    }
1396
1397    pub fn new_group_top_n_metrics(
1398        &self,
1399        table_id: u32,
1400        actor_id: ActorId,
1401        fragment_id: FragmentId,
1402    ) -> GroupTopNMetrics {
1403        let label_list: &[&str; 3] = &[
1404            &table_id.to_string(),
1405            &actor_id.to_string(),
1406            &fragment_id.to_string(),
1407        ];
1408
1409        GroupTopNMetrics {
1410            group_top_n_cache_miss_count: self
1411                .group_top_n_cache_miss_count
1412                .with_guarded_label_values(label_list),
1413            group_top_n_total_query_cache_count: self
1414                .group_top_n_total_query_cache_count
1415                .with_guarded_label_values(label_list),
1416            group_top_n_cached_entry_count: self
1417                .group_top_n_cached_entry_count
1418                .with_guarded_label_values(label_list),
1419        }
1420    }
1421
1422    pub fn new_append_only_group_top_n_metrics(
1423        &self,
1424        table_id: u32,
1425        actor_id: ActorId,
1426        fragment_id: FragmentId,
1427    ) -> GroupTopNMetrics {
1428        let label_list: &[&str; 3] = &[
1429            &table_id.to_string(),
1430            &actor_id.to_string(),
1431            &fragment_id.to_string(),
1432        ];
1433
1434        GroupTopNMetrics {
1435            group_top_n_cache_miss_count: self
1436                .group_top_n_appendonly_cache_miss_count
1437                .with_guarded_label_values(label_list),
1438            group_top_n_total_query_cache_count: self
1439                .group_top_n_appendonly_total_query_cache_count
1440                .with_guarded_label_values(label_list),
1441            group_top_n_cached_entry_count: self
1442                .group_top_n_appendonly_cached_entry_count
1443                .with_guarded_label_values(label_list),
1444        }
1445    }
1446
1447    pub fn new_lookup_executor_metrics(
1448        &self,
1449        table_id: TableId,
1450        actor_id: ActorId,
1451        fragment_id: FragmentId,
1452    ) -> LookupExecutorMetrics {
1453        let label_list: &[&str; 3] = &[
1454            &table_id.to_string(),
1455            &actor_id.to_string(),
1456            &fragment_id.to_string(),
1457        ];
1458
1459        LookupExecutorMetrics {
1460            lookup_cache_miss_count: self
1461                .lookup_cache_miss_count
1462                .with_guarded_label_values(label_list),
1463            lookup_total_query_cache_count: self
1464                .lookup_total_query_cache_count
1465                .with_guarded_label_values(label_list),
1466            lookup_cached_entry_count: self
1467                .lookup_cached_entry_count
1468                .with_guarded_label_values(label_list),
1469        }
1470    }
1471
1472    pub fn new_hash_agg_metrics(
1473        &self,
1474        table_id: u32,
1475        actor_id: ActorId,
1476        fragment_id: FragmentId,
1477    ) -> HashAggMetrics {
1478        let label_list: &[&str; 3] = &[
1479            &table_id.to_string(),
1480            &actor_id.to_string(),
1481            &fragment_id.to_string(),
1482        ];
1483        HashAggMetrics {
1484            agg_lookup_miss_count: self
1485                .agg_lookup_miss_count
1486                .with_guarded_label_values(label_list),
1487            agg_total_lookup_count: self
1488                .agg_total_lookup_count
1489                .with_guarded_label_values(label_list),
1490            agg_cached_entry_count: self
1491                .agg_cached_entry_count
1492                .with_guarded_label_values(label_list),
1493            agg_chunk_lookup_miss_count: self
1494                .agg_chunk_lookup_miss_count
1495                .with_guarded_label_values(label_list),
1496            agg_chunk_total_lookup_count: self
1497                .agg_chunk_total_lookup_count
1498                .with_guarded_label_values(label_list),
1499            agg_dirty_groups_count: self
1500                .agg_dirty_groups_count
1501                .with_guarded_label_values(label_list),
1502            agg_dirty_groups_heap_size: self
1503                .agg_dirty_groups_heap_size
1504                .with_guarded_label_values(label_list),
1505            agg_state_cache_lookup_count: self
1506                .agg_state_cache_lookup_count
1507                .with_guarded_label_values(label_list),
1508            agg_state_cache_miss_count: self
1509                .agg_state_cache_miss_count
1510                .with_guarded_label_values(label_list),
1511        }
1512    }
1513
1514    pub fn new_agg_distinct_dedup_metrics(
1515        &self,
1516        table_id: u32,
1517        actor_id: ActorId,
1518        fragment_id: FragmentId,
1519    ) -> AggDistinctDedupMetrics {
1520        let label_list: &[&str; 3] = &[
1521            &table_id.to_string(),
1522            &actor_id.to_string(),
1523            &fragment_id.to_string(),
1524        ];
1525        AggDistinctDedupMetrics {
1526            agg_distinct_cache_miss_count: self
1527                .agg_distinct_cache_miss_count
1528                .with_guarded_label_values(label_list),
1529            agg_distinct_total_cache_count: self
1530                .agg_distinct_total_cache_count
1531                .with_guarded_label_values(label_list),
1532            agg_distinct_cached_entry_count: self
1533                .agg_distinct_cached_entry_count
1534                .with_guarded_label_values(label_list),
1535        }
1536    }
1537
1538    pub fn new_temporal_join_metrics(
1539        &self,
1540        table_id: TableId,
1541        actor_id: ActorId,
1542        fragment_id: FragmentId,
1543    ) -> TemporalJoinMetrics {
1544        let label_list: &[&str; 3] = &[
1545            &table_id.to_string(),
1546            &actor_id.to_string(),
1547            &fragment_id.to_string(),
1548        ];
1549        TemporalJoinMetrics {
1550            temporal_join_cache_miss_count: self
1551                .temporal_join_cache_miss_count
1552                .with_guarded_label_values(label_list),
1553            temporal_join_total_query_cache_count: self
1554                .temporal_join_total_query_cache_count
1555                .with_guarded_label_values(label_list),
1556            temporal_join_cached_entry_count: self
1557                .temporal_join_cached_entry_count
1558                .with_guarded_label_values(label_list),
1559        }
1560    }
1561
1562    pub fn new_backfill_metrics(&self, table_id: u32, actor_id: ActorId) -> BackfillMetrics {
1563        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1564        BackfillMetrics {
1565            backfill_snapshot_read_row_count: self
1566                .backfill_snapshot_read_row_count
1567                .with_guarded_label_values(label_list),
1568            backfill_upstream_output_row_count: self
1569                .backfill_upstream_output_row_count
1570                .with_guarded_label_values(label_list),
1571        }
1572    }
1573
1574    pub fn new_cdc_backfill_metrics(
1575        &self,
1576        table_id: TableId,
1577        actor_id: ActorId,
1578    ) -> CdcBackfillMetrics {
1579        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1580        CdcBackfillMetrics {
1581            cdc_backfill_snapshot_read_row_count: self
1582                .cdc_backfill_snapshot_read_row_count
1583                .with_guarded_label_values(label_list),
1584            cdc_backfill_upstream_output_row_count: self
1585                .cdc_backfill_upstream_output_row_count
1586                .with_guarded_label_values(label_list),
1587        }
1588    }
1589
1590    pub fn new_over_window_metrics(
1591        &self,
1592        table_id: u32,
1593        actor_id: ActorId,
1594        fragment_id: FragmentId,
1595    ) -> OverWindowMetrics {
1596        let label_list: &[&str; 3] = &[
1597            &table_id.to_string(),
1598            &actor_id.to_string(),
1599            &fragment_id.to_string(),
1600        ];
1601        OverWindowMetrics {
1602            over_window_cached_entry_count: self
1603                .over_window_cached_entry_count
1604                .with_guarded_label_values(label_list),
1605            over_window_cache_lookup_count: self
1606                .over_window_cache_lookup_count
1607                .with_guarded_label_values(label_list),
1608            over_window_cache_miss_count: self
1609                .over_window_cache_miss_count
1610                .with_guarded_label_values(label_list),
1611            over_window_range_cache_entry_count: self
1612                .over_window_range_cache_entry_count
1613                .with_guarded_label_values(label_list),
1614            over_window_range_cache_lookup_count: self
1615                .over_window_range_cache_lookup_count
1616                .with_guarded_label_values(label_list),
1617            over_window_range_cache_left_miss_count: self
1618                .over_window_range_cache_left_miss_count
1619                .with_guarded_label_values(label_list),
1620            over_window_range_cache_right_miss_count: self
1621                .over_window_range_cache_right_miss_count
1622                .with_guarded_label_values(label_list),
1623            over_window_accessed_entry_count: self
1624                .over_window_accessed_entry_count
1625                .with_guarded_label_values(label_list),
1626            over_window_compute_count: self
1627                .over_window_compute_count
1628                .with_guarded_label_values(label_list),
1629            over_window_same_output_count: self
1630                .over_window_same_output_count
1631                .with_guarded_label_values(label_list),
1632        }
1633    }
1634
1635    pub fn new_materialize_metrics(
1636        &self,
1637        table_id: TableId,
1638        actor_id: ActorId,
1639        fragment_id: FragmentId,
1640    ) -> MaterializeMetrics {
1641        let label_list: &[&str; 3] = &[
1642            &actor_id.to_string(),
1643            &table_id.to_string(),
1644            &fragment_id.to_string(),
1645        ];
1646        MaterializeMetrics {
1647            materialize_cache_hit_count: self
1648                .materialize_cache_hit_count
1649                .with_guarded_label_values(label_list),
1650            materialize_cache_total_count: self
1651                .materialize_cache_total_count
1652                .with_guarded_label_values(label_list),
1653            materialize_input_row_count: self
1654                .materialize_input_row_count
1655                .with_guarded_label_values(label_list),
1656            materialize_current_epoch: self
1657                .materialize_current_epoch
1658                .with_guarded_label_values(label_list),
1659        }
1660    }
1661}
1662
1663pub(crate) struct ActorInputMetrics {
1664    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter<3>,
1665    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>,
1666}
1667
1668/// Tokio metrics for actors
1669pub struct ActorMetrics {
1670    pub actor_execution_time: LabelGuardedGauge<1>,
1671    pub actor_scheduled_duration: LabelGuardedGauge<1>,
1672    pub actor_scheduled_cnt: LabelGuardedIntGauge<1>,
1673    pub actor_fast_poll_duration: LabelGuardedGauge<1>,
1674    pub actor_fast_poll_cnt: LabelGuardedIntGauge<1>,
1675    pub actor_slow_poll_duration: LabelGuardedGauge<1>,
1676    pub actor_slow_poll_cnt: LabelGuardedIntGauge<1>,
1677    pub actor_poll_duration: LabelGuardedGauge<1>,
1678    pub actor_poll_cnt: LabelGuardedIntGauge<1>,
1679    pub actor_idle_duration: LabelGuardedGauge<1>,
1680    pub actor_idle_cnt: LabelGuardedIntGauge<1>,
1681}
1682
1683pub struct SinkExecutorMetrics {
1684    pub sink_input_row_count: LabelGuardedIntCounter<3>,
1685    pub sink_input_bytes: LabelGuardedIntCounter<3>,
1686    pub sink_chunk_buffer_size: LabelGuardedIntGauge<3>,
1687}
1688
1689pub struct MaterializeMetrics {
1690    pub materialize_cache_hit_count: LabelGuardedIntCounter<3>,
1691    pub materialize_cache_total_count: LabelGuardedIntCounter<3>,
1692    pub materialize_input_row_count: LabelGuardedIntCounter<3>,
1693    pub materialize_current_epoch: LabelGuardedIntGauge<3>,
1694}
1695
1696pub struct GroupTopNMetrics {
1697    pub group_top_n_cache_miss_count: LabelGuardedIntCounter<3>,
1698    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter<3>,
1699    pub group_top_n_cached_entry_count: LabelGuardedIntGauge<3>,
1700}
1701
1702pub struct LookupExecutorMetrics {
1703    pub lookup_cache_miss_count: LabelGuardedIntCounter<3>,
1704    pub lookup_total_query_cache_count: LabelGuardedIntCounter<3>,
1705    pub lookup_cached_entry_count: LabelGuardedIntGauge<3>,
1706}
1707
1708pub struct HashAggMetrics {
1709    pub agg_lookup_miss_count: LabelGuardedIntCounter<3>,
1710    pub agg_total_lookup_count: LabelGuardedIntCounter<3>,
1711    pub agg_cached_entry_count: LabelGuardedIntGauge<3>,
1712    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter<3>,
1713    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter<3>,
1714    pub agg_dirty_groups_count: LabelGuardedIntGauge<3>,
1715    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge<3>,
1716    pub agg_state_cache_lookup_count: LabelGuardedIntCounter<3>,
1717    pub agg_state_cache_miss_count: LabelGuardedIntCounter<3>,
1718}
1719
1720pub struct AggDistinctDedupMetrics {
1721    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter<3>,
1722    pub agg_distinct_total_cache_count: LabelGuardedIntCounter<3>,
1723    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge<3>,
1724}
1725
1726pub struct TemporalJoinMetrics {
1727    pub temporal_join_cache_miss_count: LabelGuardedIntCounter<3>,
1728    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter<3>,
1729    pub temporal_join_cached_entry_count: LabelGuardedIntGauge<3>,
1730}
1731
1732pub struct BackfillMetrics {
1733    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter<2>,
1734    pub backfill_upstream_output_row_count: LabelGuardedIntCounter<2>,
1735}
1736
1737pub struct CdcBackfillMetrics {
1738    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter<2>,
1739    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter<2>,
1740}
1741
1742pub struct OverWindowMetrics {
1743    pub over_window_cached_entry_count: LabelGuardedIntGauge<3>,
1744    pub over_window_cache_lookup_count: LabelGuardedIntCounter<3>,
1745    pub over_window_cache_miss_count: LabelGuardedIntCounter<3>,
1746    pub over_window_range_cache_entry_count: LabelGuardedIntGauge<3>,
1747    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter<3>,
1748    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter<3>,
1749    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter<3>,
1750    pub over_window_accessed_entry_count: LabelGuardedIntCounter<3>,
1751    pub over_window_compute_count: LabelGuardedIntCounter<3>,
1752    pub over_window_same_output_count: LabelGuardedIntCounter<3>,
1753}