risingwave_stream/executor/monitor/
streaming_stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::OnceLock;
16
17use prometheus::{
18    Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19    register_histogram_with_registry, register_int_counter_with_registry,
20    register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27    RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32    register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33    register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38    REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45    pub level: MetricLevel,
46
47    // Executor metrics (disabled by default)
48    pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50    // Profiling Metrics:
51    // Aggregated per operator rather than per actor.
52    // These are purely in-memory, never collected by prometheus.
53    pub mem_stream_node_output_row_count: CountMap,
54    pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56    // Streaming actor metrics from tokio (disabled by default)
57    actor_execution_time: LabelGuardedGaugeVec,
58    actor_scheduled_duration: LabelGuardedGaugeVec,
59    actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60    actor_fast_poll_duration: LabelGuardedGaugeVec,
61    actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62    actor_slow_poll_duration: LabelGuardedGaugeVec,
63    actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64    actor_poll_duration: LabelGuardedGaugeVec,
65    actor_poll_cnt: LabelGuardedIntGaugeVec,
66    actor_idle_duration: LabelGuardedGaugeVec,
67    actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69    // Streaming actor
70    pub actor_count: LabelGuardedIntGaugeVec,
71    pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72    pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73    pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75    // Source
76    pub source_output_row_count: LabelGuardedIntCounterVec,
77    pub source_split_change_count: LabelGuardedIntCounterVec,
78    pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80    // Sink
81    sink_input_row_count: LabelGuardedIntCounterVec,
82    sink_input_bytes: LabelGuardedIntCounterVec,
83    sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85    // Exchange (see also `compute::ExchangeServiceMetrics`)
86    pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88    // Streaming Merge (We break out this metric from `barrier_align_duration` because
89    // the alignment happens on different levels)
90    pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92    // Backpressure
93    pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94    actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec,
95
96    // Streaming Join
97    pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98    pub join_lookup_total_count: LabelGuardedIntCounterVec,
99    pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100    pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101    pub join_match_duration_ns: LabelGuardedIntCounterVec,
102    pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103    pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105    // Streaming Join, Streaming Dynamic Filter and Streaming Union
106    pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108    // Streaming Aggregation
109    agg_lookup_miss_count: LabelGuardedIntCounterVec,
110    agg_total_lookup_count: LabelGuardedIntCounterVec,
111    agg_cached_entry_count: LabelGuardedIntGaugeVec,
112    agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113    agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114    agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115    agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116    agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117    agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118    agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119    agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120    agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122    // Streaming TopN
123    group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124    group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125    group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126    // TODO(rc): why not just use the above three?
127    group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128    group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129    group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131    // Lookup executor
132    lookup_cache_miss_count: LabelGuardedIntCounterVec,
133    lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134    lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136    // temporal join
137    temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138    temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139    temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141    // Backfill
142    backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143    backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145    // CDC Backfill
146    cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147    cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149    // Snapshot Backfill
150    pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152    // Over Window
153    over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154    over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155    over_window_cache_miss_count: LabelGuardedIntCounterVec,
156    over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157    over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158    over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159    over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160    over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161    over_window_compute_count: LabelGuardedIntCounterVec,
162    over_window_same_output_count: LabelGuardedIntCounterVec,
163
164    /// The duration from receipt of barrier to all actors collection.
165    /// And the max of all node `barrier_inflight_latency` is the latency for a barrier
166    /// to flow through the graph.
167    pub barrier_inflight_latency: Histogram,
168    /// The duration of sync to storage.
169    pub barrier_sync_latency: Histogram,
170    pub barrier_batch_size: Histogram,
171    /// The progress made by the earliest in-flight barriers in the local barrier manager.
172    pub barrier_manager_progress: IntCounter,
173
174    pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175    pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176    pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177    pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178    pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179    pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180    pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181    pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182    pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183    pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185    pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186    pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187    pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188    pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189    pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190    pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191    pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192    pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193    pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194    pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195    pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197    // Memory management
198    pub lru_runtime_loop_count: IntCounter,
199    pub lru_latest_sequence: IntGauge,
200    pub lru_watermark_sequence: IntGauge,
201    pub lru_eviction_policy: IntGauge,
202    pub jemalloc_allocated_bytes: IntGauge,
203    pub jemalloc_active_bytes: IntGauge,
204    pub jemalloc_resident_bytes: IntGauge,
205    pub jemalloc_metadata_bytes: IntGauge,
206    pub jvm_allocated_bytes: IntGauge,
207    pub jvm_active_bytes: IntGauge,
208    pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210    // Materialized view
211    materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212    materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213    materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214    materialize_input_row_count: RelabeledGuardedIntCounterVec,
215    pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216}
217
218pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
219
220pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
221    GLOBAL_STREAMING_METRICS
222        .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
223        .clone()
224}
225
226impl StreamingMetrics {
227    fn new(registry: &Registry, level: MetricLevel) -> Self {
228        let executor_row_count = register_guarded_int_counter_vec_with_registry!(
229            "stream_executor_row_count",
230            "Total number of rows that have been output from each executor",
231            &["actor_id", "fragment_id", "executor_identity"],
232            registry
233        )
234        .unwrap()
235        .relabel_debug_1(level);
236
237        let stream_node_output_row_count = CountMap::new();
238        let stream_node_output_blocking_duration_ns = CountMap::new();
239
240        let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
241            "stream_source_output_rows_counts",
242            "Total number of rows that have been output from source",
243            &["source_id", "source_name", "actor_id", "fragment_id"],
244            registry
245        )
246        .unwrap();
247
248        let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
249            "stream_source_split_change_event_count",
250            "Total number of split change events that have been operated by source",
251            &["source_id", "source_name", "actor_id", "fragment_id"],
252            registry
253        )
254        .unwrap();
255
256        let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
257            "stream_source_backfill_rows_counts",
258            "Total number of rows that have been backfilled for source",
259            &["source_id", "source_name", "actor_id", "fragment_id"],
260            registry
261        )
262        .unwrap();
263
264        let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
265            "stream_sink_input_row_count",
266            "Total number of rows streamed into sink executors",
267            &["sink_id", "actor_id", "fragment_id"],
268            registry
269        )
270        .unwrap();
271
272        let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
273            "stream_sink_input_bytes",
274            "Total size of chunks streamed into sink executors",
275            &["sink_id", "actor_id", "fragment_id"],
276            registry
277        )
278        .unwrap();
279
280        let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
281            "stream_mview_input_row_count",
282            "Total number of rows streamed into materialize executors",
283            &["actor_id", "table_id", "fragment_id"],
284            registry
285        )
286        .unwrap()
287        .relabel_debug_1(level);
288
289        let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
290            "stream_mview_current_epoch",
291            "The current epoch of the materialized executor",
292            &["actor_id", "table_id", "fragment_id"],
293            registry
294        )
295        .unwrap()
296        .relabel_debug_1(level);
297
298        let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
299            "stream_sink_chunk_buffer_size",
300            "Total size of chunks buffered in a barrier",
301            &["sink_id", "actor_id", "fragment_id"],
302            registry
303        )
304        .unwrap();
305
306        let actor_execution_time = register_guarded_gauge_vec_with_registry!(
307            "stream_actor_actor_execution_time",
308            "Total execution time (s) of an actor",
309            &["actor_id"],
310            registry
311        )
312        .unwrap();
313
314        let actor_output_buffer_blocking_duration_ns =
315            register_guarded_int_counter_vec_with_registry!(
316                "stream_actor_output_buffer_blocking_duration_ns",
317                "Total blocking duration (ns) of output buffer",
318                &["actor_id", "fragment_id", "downstream_fragment_id"],
319                registry
320            )
321            .unwrap()
322            // mask the first label `actor_id` if the level is less verbose than `Debug`
323            .relabel_debug_1(level);
324
325        let actor_input_buffer_blocking_duration_ns =
326            register_guarded_int_counter_vec_with_registry!(
327                "stream_actor_input_buffer_blocking_duration_ns",
328                "Total blocking duration (ns) of input buffer",
329                &["actor_id", "fragment_id", "upstream_fragment_id"],
330                registry
331            )
332            .unwrap();
333
334        let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
335            "stream_exchange_frag_recv_size",
336            "Total size of messages that have been received from upstream Fragment",
337            &["up_fragment_id", "down_fragment_id"],
338            registry
339        )
340        .unwrap();
341
342        let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
343            "stream_actor_fast_poll_duration",
344            "tokio's metrics",
345            &["actor_id"],
346            registry
347        )
348        .unwrap();
349
350        let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
351            "stream_actor_fast_poll_cnt",
352            "tokio's metrics",
353            &["actor_id"],
354            registry
355        )
356        .unwrap();
357
358        let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
359            "stream_actor_slow_poll_duration",
360            "tokio's metrics",
361            &["actor_id"],
362            registry
363        )
364        .unwrap();
365
366        let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
367            "stream_actor_slow_poll_cnt",
368            "tokio's metrics",
369            &["actor_id"],
370            registry
371        )
372        .unwrap();
373
374        let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
375            "stream_actor_poll_duration",
376            "tokio's metrics",
377            &["actor_id"],
378            registry
379        )
380        .unwrap();
381
382        let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
383            "stream_actor_poll_cnt",
384            "tokio's metrics",
385            &["actor_id"],
386            registry
387        )
388        .unwrap();
389
390        let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
391            "stream_actor_scheduled_duration",
392            "tokio's metrics",
393            &["actor_id"],
394            registry
395        )
396        .unwrap();
397
398        let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
399            "stream_actor_scheduled_cnt",
400            "tokio's metrics",
401            &["actor_id"],
402            registry
403        )
404        .unwrap();
405
406        let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
407            "stream_actor_idle_duration",
408            "tokio's metrics",
409            &["actor_id"],
410            registry
411        )
412        .unwrap();
413
414        let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
415            "stream_actor_idle_cnt",
416            "tokio's metrics",
417            &["actor_id"],
418            registry
419        )
420        .unwrap();
421
422        let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
423            "stream_actor_in_record_cnt",
424            "Total number of rows actor received",
425            &["actor_id", "fragment_id", "upstream_fragment_id"],
426            registry
427        )
428        .unwrap()
429        .relabel_debug_1(level);
430
431        let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
432            "stream_actor_out_record_cnt",
433            "Total number of rows actor sent",
434            &["actor_id", "fragment_id"],
435            registry
436        )
437        .unwrap()
438        .relabel_debug_1(level);
439
440        let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
441            "stream_actor_current_epoch",
442            "Current epoch of actor",
443            &["actor_id", "fragment_id"],
444            registry
445        )
446        .unwrap()
447        .relabel_debug_1(level);
448
449        let actor_count = register_guarded_int_gauge_vec_with_registry!(
450            "stream_actor_count",
451            "Total number of actors (parallelism)",
452            &["fragment_id"],
453            registry
454        )
455        .unwrap();
456
457        let opts = histogram_opts!(
458            "stream_merge_barrier_align_duration",
459            "Duration of merge align barrier",
460            exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
461        );
462        let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
463            opts,
464            &["actor_id", "fragment_id"],
465            registry
466        )
467        .unwrap()
468        .relabel_debug_1(level);
469
470        let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
471            "stream_join_lookup_miss_count",
472            "Join executor lookup miss duration",
473            &["side", "join_table_id", "actor_id", "fragment_id"],
474            registry
475        )
476        .unwrap();
477
478        let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
479            "stream_join_lookup_total_count",
480            "Join executor lookup total operation",
481            &["side", "join_table_id", "actor_id", "fragment_id"],
482            registry
483        )
484        .unwrap();
485
486        let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
487            "stream_join_insert_cache_miss_count",
488            "Join executor cache miss when insert operation",
489            &["side", "join_table_id", "actor_id", "fragment_id"],
490            registry
491        )
492        .unwrap();
493
494        let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
495            "stream_join_actor_input_waiting_duration_ns",
496            "Total waiting duration (ns) of input buffer of join actor",
497            &["actor_id", "fragment_id"],
498            registry
499        )
500        .unwrap();
501
502        let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
503            "stream_join_match_duration_ns",
504            "Matching duration for each side",
505            &["actor_id", "fragment_id", "side"],
506            registry
507        )
508        .unwrap();
509
510        let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
511            "stream_barrier_align_duration_ns",
512            "Duration of join align barrier",
513            &["actor_id", "fragment_id", "wait_side", "executor"],
514            registry
515        )
516        .unwrap()
517        .relabel_debug_1(level);
518
519        let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
520            "stream_join_cached_entry_count",
521            "Number of cached entries in streaming join operators",
522            &["actor_id", "fragment_id", "side"],
523            registry
524        )
525        .unwrap();
526
527        let join_matched_join_keys_opts = histogram_opts!(
528            "stream_join_matched_join_keys",
529            "The number of keys matched in the opposite side",
530            exponential_buckets(16.0, 2.0, 28).unwrap() // max 2^31
531        );
532
533        let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
534            join_matched_join_keys_opts,
535            &["actor_id", "fragment_id", "table_id"],
536            registry
537        )
538        .unwrap()
539        .relabel_debug_1(level);
540
541        let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
542            "stream_agg_lookup_miss_count",
543            "Aggregation executor lookup miss duration",
544            &["table_id", "actor_id", "fragment_id"],
545            registry
546        )
547        .unwrap();
548
549        let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
550            "stream_agg_lookup_total_count",
551            "Aggregation executor lookup total operation",
552            &["table_id", "actor_id", "fragment_id"],
553            registry
554        )
555        .unwrap();
556
557        let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
558            "stream_agg_distinct_cache_miss_count",
559            "Aggregation executor dinsinct miss duration",
560            &["table_id", "actor_id", "fragment_id"],
561            registry
562        )
563        .unwrap();
564
565        let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
566            "stream_agg_distinct_total_cache_count",
567            "Aggregation executor distinct total operation",
568            &["table_id", "actor_id", "fragment_id"],
569            registry
570        )
571        .unwrap();
572
573        let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
574            "stream_agg_distinct_cached_entry_count",
575            "Total entry counts in distinct aggregation executor cache",
576            &["table_id", "actor_id", "fragment_id"],
577            registry
578        )
579        .unwrap();
580
581        let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
582            "stream_agg_dirty_groups_count",
583            "Total dirty group counts in aggregation executor",
584            &["table_id", "actor_id", "fragment_id"],
585            registry
586        )
587        .unwrap();
588
589        let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
590            "stream_agg_dirty_groups_heap_size",
591            "Total dirty group heap size in aggregation executor",
592            &["table_id", "actor_id", "fragment_id"],
593            registry
594        )
595        .unwrap();
596
597        let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
598            "stream_agg_state_cache_lookup_count",
599            "Aggregation executor state cache lookup count",
600            &["table_id", "actor_id", "fragment_id"],
601            registry
602        )
603        .unwrap();
604
605        let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
606            "stream_agg_state_cache_miss_count",
607            "Aggregation executor state cache miss count",
608            &["table_id", "actor_id", "fragment_id"],
609            registry
610        )
611        .unwrap();
612
613        let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
614            "stream_group_top_n_cache_miss_count",
615            "Group top n executor cache miss count",
616            &["table_id", "actor_id", "fragment_id"],
617            registry
618        )
619        .unwrap();
620
621        let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
622            "stream_group_top_n_total_query_cache_count",
623            "Group top n executor query cache total count",
624            &["table_id", "actor_id", "fragment_id"],
625            registry
626        )
627        .unwrap();
628
629        let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
630            "stream_group_top_n_cached_entry_count",
631            "Total entry counts in group top n executor cache",
632            &["table_id", "actor_id", "fragment_id"],
633            registry
634        )
635        .unwrap();
636
637        let group_top_n_appendonly_cache_miss_count =
638            register_guarded_int_counter_vec_with_registry!(
639                "stream_group_top_n_appendonly_cache_miss_count",
640                "Group top n appendonly executor cache miss count",
641                &["table_id", "actor_id", "fragment_id"],
642                registry
643            )
644            .unwrap();
645
646        let group_top_n_appendonly_total_query_cache_count =
647            register_guarded_int_counter_vec_with_registry!(
648                "stream_group_top_n_appendonly_total_query_cache_count",
649                "Group top n appendonly executor total cache count",
650                &["table_id", "actor_id", "fragment_id"],
651                registry
652            )
653            .unwrap();
654
655        let group_top_n_appendonly_cached_entry_count =
656            register_guarded_int_gauge_vec_with_registry!(
657                "stream_group_top_n_appendonly_cached_entry_count",
658                "Total entry counts in group top n appendonly executor cache",
659                &["table_id", "actor_id", "fragment_id"],
660                registry
661            )
662            .unwrap();
663
664        let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
665            "stream_lookup_cache_miss_count",
666            "Lookup executor cache miss count",
667            &["table_id", "actor_id", "fragment_id"],
668            registry
669        )
670        .unwrap();
671
672        let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
673            "stream_lookup_total_query_cache_count",
674            "Lookup executor query cache total count",
675            &["table_id", "actor_id", "fragment_id"],
676            registry
677        )
678        .unwrap();
679
680        let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
681            "stream_lookup_cached_entry_count",
682            "Total entry counts in lookup executor cache",
683            &["table_id", "actor_id", "fragment_id"],
684            registry
685        )
686        .unwrap();
687
688        let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
689            "stream_temporal_join_cache_miss_count",
690            "Temporal join executor cache miss count",
691            &["table_id", "actor_id", "fragment_id"],
692            registry
693        )
694        .unwrap();
695
696        let temporal_join_total_query_cache_count =
697            register_guarded_int_counter_vec_with_registry!(
698                "stream_temporal_join_total_query_cache_count",
699                "Temporal join executor query cache total count",
700                &["table_id", "actor_id", "fragment_id"],
701                registry
702            )
703            .unwrap();
704
705        let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
706            "stream_temporal_join_cached_entry_count",
707            "Total entry count in temporal join executor cache",
708            &["table_id", "actor_id", "fragment_id"],
709            registry
710        )
711        .unwrap();
712
713        let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
714            "stream_agg_cached_entry_count",
715            "Number of cached keys in streaming aggregation operators",
716            &["table_id", "actor_id", "fragment_id"],
717            registry
718        )
719        .unwrap();
720
721        let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
722            "stream_agg_chunk_lookup_miss_count",
723            "Aggregation executor chunk-level lookup miss duration",
724            &["table_id", "actor_id", "fragment_id"],
725            registry
726        )
727        .unwrap();
728
729        let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
730            "stream_agg_chunk_lookup_total_count",
731            "Aggregation executor chunk-level lookup total operation",
732            &["table_id", "actor_id", "fragment_id"],
733            registry
734        )
735        .unwrap();
736
737        let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
738            "stream_backfill_snapshot_read_row_count",
739            "Total number of rows that have been read from the backfill snapshot",
740            &["table_id", "actor_id"],
741            registry
742        )
743        .unwrap();
744
745        let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
746            "stream_backfill_upstream_output_row_count",
747            "Total number of rows that have been output from the backfill upstream",
748            &["table_id", "actor_id"],
749            registry
750        )
751        .unwrap();
752
753        let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
754            "stream_cdc_backfill_snapshot_read_row_count",
755            "Total number of rows that have been read from the cdc_backfill snapshot",
756            &["table_id", "actor_id"],
757            registry
758        )
759        .unwrap();
760
761        let cdc_backfill_upstream_output_row_count =
762            register_guarded_int_counter_vec_with_registry!(
763                "stream_cdc_backfill_upstream_output_row_count",
764                "Total number of rows that have been output from the cdc_backfill upstream",
765                &["table_id", "actor_id"],
766                registry
767            )
768            .unwrap();
769
770        let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
771            "stream_snapshot_backfill_consume_snapshot_row_count",
772            "Total number of rows that have been output from snapshot backfill",
773            &["table_id", "actor_id", "stage"],
774            registry
775        )
776        .unwrap();
777
778        let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
779            "stream_over_window_cached_entry_count",
780            "Total entry (partition) count in over window executor cache",
781            &["table_id", "actor_id", "fragment_id"],
782            registry
783        )
784        .unwrap();
785
786        let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
787            "stream_over_window_cache_lookup_count",
788            "Over window executor cache lookup count",
789            &["table_id", "actor_id", "fragment_id"],
790            registry
791        )
792        .unwrap();
793
794        let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
795            "stream_over_window_cache_miss_count",
796            "Over window executor cache miss count",
797            &["table_id", "actor_id", "fragment_id"],
798            registry
799        )
800        .unwrap();
801
802        let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
803            "stream_over_window_range_cache_entry_count",
804            "Over window partition range cache entry count",
805            &["table_id", "actor_id", "fragment_id"],
806            registry,
807        )
808        .unwrap();
809
810        let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
811            "stream_over_window_range_cache_lookup_count",
812            "Over window partition range cache lookup count",
813            &["table_id", "actor_id", "fragment_id"],
814            registry
815        )
816        .unwrap();
817
818        let over_window_range_cache_left_miss_count =
819            register_guarded_int_counter_vec_with_registry!(
820                "stream_over_window_range_cache_left_miss_count",
821                "Over window partition range cache left miss count",
822                &["table_id", "actor_id", "fragment_id"],
823                registry
824            )
825            .unwrap();
826
827        let over_window_range_cache_right_miss_count =
828            register_guarded_int_counter_vec_with_registry!(
829                "stream_over_window_range_cache_right_miss_count",
830                "Over window partition range cache right miss count",
831                &["table_id", "actor_id", "fragment_id"],
832                registry
833            )
834            .unwrap();
835
836        let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
837            "stream_over_window_accessed_entry_count",
838            "Over window accessed entry count",
839            &["table_id", "actor_id", "fragment_id"],
840            registry
841        )
842        .unwrap();
843
844        let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
845            "stream_over_window_compute_count",
846            "Over window compute count",
847            &["table_id", "actor_id", "fragment_id"],
848            registry
849        )
850        .unwrap();
851
852        let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
853            "stream_over_window_same_output_count",
854            "Over window same output count",
855            &["table_id", "actor_id", "fragment_id"],
856            registry
857        )
858        .unwrap();
859
860        let opts = histogram_opts!(
861            "stream_barrier_inflight_duration_seconds",
862            "barrier_inflight_latency",
863            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
864        );
865        let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
866
867        let opts = histogram_opts!(
868            "stream_barrier_sync_storage_duration_seconds",
869            "barrier_sync_latency",
870            exponential_buckets(0.1, 1.5, 16).unwrap() // max 43
871        );
872        let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
873
874        let opts = histogram_opts!(
875            "stream_barrier_batch_size",
876            "barrier_batch_size",
877            exponential_buckets(1.0, 2.0, 8).unwrap()
878        );
879        let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
880
881        let barrier_manager_progress = register_int_counter_with_registry!(
882            "stream_barrier_manager_progress",
883            "The number of actors that have processed the earliest in-flight barriers",
884            registry
885        )
886        .unwrap();
887
888        let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
889            "sync_kv_log_store_wait_next_poll_ns",
890            "Total duration (ns) of waiting for next poll",
891            &["actor_id", "target", "fragment_id", "relation"],
892            registry
893        )
894        .unwrap();
895
896        let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
897            "sync_kv_log_store_read_count",
898            "read row count throughput of sync_kv log store",
899            &["type", "actor_id", "target", "fragment_id", "relation"],
900            registry
901        )
902        .unwrap();
903
904        let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
905            "sync_kv_log_store_read_size",
906            "read size throughput of sync_kv log store",
907            &["type", "actor_id", "target", "fragment_id", "relation"],
908            registry
909        )
910        .unwrap();
911
912        let sync_kv_log_store_write_pause_duration_ns =
913            register_guarded_int_counter_vec_with_registry!(
914                "sync_kv_log_store_write_pause_duration_ns",
915                "Duration (ns) of sync_kv log store write pause",
916                &["actor_id", "target", "fragment_id", "relation"],
917                registry
918            )
919            .unwrap();
920
921        let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
922            "sync_kv_log_store_state",
923            "clean/unclean state transition for sync_kv log store",
924            &["state", "actor_id", "target", "fragment_id", "relation"],
925            registry
926        )
927        .unwrap();
928
929        let sync_kv_log_store_storage_write_count =
930            register_guarded_int_counter_vec_with_registry!(
931                "sync_kv_log_store_storage_write_count",
932                "Write row count throughput of sync_kv log store",
933                &["actor_id", "target", "fragment_id", "relation"],
934                registry
935            )
936            .unwrap();
937
938        let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
939            "sync_kv_log_store_storage_write_size",
940            "Write size throughput of sync_kv log store",
941            &["actor_id", "target", "fragment_id", "relation"],
942            registry
943        )
944        .unwrap();
945
946        let sync_kv_log_store_buffer_unconsumed_item_count =
947            register_guarded_int_gauge_vec_with_registry!(
948                "sync_kv_log_store_buffer_unconsumed_item_count",
949                "Number of Unconsumed Item in buffer",
950                &["actor_id", "target", "fragment_id", "relation"],
951                registry
952            )
953            .unwrap();
954
955        let sync_kv_log_store_buffer_unconsumed_row_count =
956            register_guarded_int_gauge_vec_with_registry!(
957                "sync_kv_log_store_buffer_unconsumed_row_count",
958                "Number of Unconsumed Row in buffer",
959                &["actor_id", "target", "fragment_id", "relation"],
960                registry
961            )
962            .unwrap();
963
964        let sync_kv_log_store_buffer_unconsumed_epoch_count =
965            register_guarded_int_gauge_vec_with_registry!(
966                "sync_kv_log_store_buffer_unconsumed_epoch_count",
967                "Number of Unconsumed Epoch in buffer",
968                &["actor_id", "target", "fragment_id", "relation"],
969                registry
970            )
971            .unwrap();
972
973        let sync_kv_log_store_buffer_unconsumed_min_epoch =
974            register_guarded_int_gauge_vec_with_registry!(
975                "sync_kv_log_store_buffer_unconsumed_min_epoch",
976                "Number of Unconsumed Epoch in buffer",
977                &["actor_id", "target", "fragment_id", "relation"],
978                registry
979            )
980            .unwrap();
981
982        let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
983            "kv_log_store_storage_write_count",
984            "Write row count throughput of kv log store",
985            &["actor_id", "connector", "sink_id", "sink_name"],
986            registry
987        )
988        .unwrap();
989
990        let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
991            "kv_log_store_storage_write_size",
992            "Write size throughput of kv log store",
993            &["actor_id", "connector", "sink_id", "sink_name"],
994            registry
995        )
996        .unwrap();
997
998        let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
999            "kv_log_store_storage_read_count",
1000            "Write row count throughput of kv log store",
1001            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1002            registry
1003        )
1004        .unwrap();
1005
1006        let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1007            "kv_log_store_storage_read_size",
1008            "Write size throughput of kv log store",
1009            &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1010            registry
1011        )
1012        .unwrap();
1013
1014        let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1015            "kv_log_store_rewind_count",
1016            "Kv log store rewind rate",
1017            &["actor_id", "connector", "sink_id", "sink_name"],
1018            registry
1019        )
1020        .unwrap();
1021
1022        let kv_log_store_rewind_delay_opts = {
1023            assert_eq!(2, REWIND_BACKOFF_FACTOR);
1024            let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1025                - REWIND_BASE_DELAY.as_secs_f64().log2())
1026            .ceil() as usize;
1027            let buckets = exponential_buckets(
1028                REWIND_BASE_DELAY.as_secs_f64(),
1029                REWIND_BACKOFF_FACTOR as _,
1030                bucket_count,
1031            )
1032            .unwrap();
1033            histogram_opts!(
1034                "kv_log_store_rewind_delay",
1035                "Kv log store rewind delay",
1036                buckets,
1037            )
1038        };
1039
1040        let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1041            kv_log_store_rewind_delay_opts,
1042            &["actor_id", "connector", "sink_id", "sink_name"],
1043            registry
1044        )
1045        .unwrap();
1046
1047        let kv_log_store_buffer_unconsumed_item_count =
1048            register_guarded_int_gauge_vec_with_registry!(
1049                "kv_log_store_buffer_unconsumed_item_count",
1050                "Number of Unconsumed Item in buffer",
1051                &["actor_id", "connector", "sink_id", "sink_name"],
1052                registry
1053            )
1054            .unwrap();
1055
1056        let kv_log_store_buffer_unconsumed_row_count =
1057            register_guarded_int_gauge_vec_with_registry!(
1058                "kv_log_store_buffer_unconsumed_row_count",
1059                "Number of Unconsumed Row in buffer",
1060                &["actor_id", "connector", "sink_id", "sink_name"],
1061                registry
1062            )
1063            .unwrap();
1064
1065        let kv_log_store_buffer_unconsumed_epoch_count =
1066            register_guarded_int_gauge_vec_with_registry!(
1067                "kv_log_store_buffer_unconsumed_epoch_count",
1068                "Number of Unconsumed Epoch in buffer",
1069                &["actor_id", "connector", "sink_id", "sink_name"],
1070                registry
1071            )
1072            .unwrap();
1073
1074        let kv_log_store_buffer_unconsumed_min_epoch =
1075            register_guarded_int_gauge_vec_with_registry!(
1076                "kv_log_store_buffer_unconsumed_min_epoch",
1077                "Number of Unconsumed Epoch in buffer",
1078                &["actor_id", "connector", "sink_id", "sink_name"],
1079                registry
1080            )
1081            .unwrap();
1082
1083        let lru_runtime_loop_count = register_int_counter_with_registry!(
1084            "lru_runtime_loop_count",
1085            "The counts of the eviction loop in LRU manager per second",
1086            registry
1087        )
1088        .unwrap();
1089
1090        let lru_latest_sequence = register_int_gauge_with_registry!(
1091            "lru_latest_sequence",
1092            "Current LRU global sequence",
1093            registry,
1094        )
1095        .unwrap();
1096
1097        let lru_watermark_sequence = register_int_gauge_with_registry!(
1098            "lru_watermark_sequence",
1099            "Current LRU watermark sequence",
1100            registry,
1101        )
1102        .unwrap();
1103
1104        let lru_eviction_policy = register_int_gauge_with_registry!(
1105            "lru_eviction_policy",
1106            "Current LRU eviction policy",
1107            registry,
1108        )
1109        .unwrap();
1110
1111        let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1112            "jemalloc_allocated_bytes",
1113            "The allocated memory jemalloc, got from jemalloc_ctl",
1114            registry
1115        )
1116        .unwrap();
1117
1118        let jemalloc_active_bytes = register_int_gauge_with_registry!(
1119            "jemalloc_active_bytes",
1120            "The active memory jemalloc, got from jemalloc_ctl",
1121            registry
1122        )
1123        .unwrap();
1124
1125        let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1126            "jemalloc_resident_bytes",
1127            "The active memory jemalloc, got from jemalloc_ctl",
1128            registry
1129        )
1130        .unwrap();
1131
1132        let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1133            "jemalloc_metadata_bytes",
1134            "The active memory jemalloc, got from jemalloc_ctl",
1135            registry
1136        )
1137        .unwrap();
1138
1139        let jvm_allocated_bytes = register_int_gauge_with_registry!(
1140            "jvm_allocated_bytes",
1141            "The allocated jvm memory",
1142            registry
1143        )
1144        .unwrap();
1145
1146        let jvm_active_bytes = register_int_gauge_with_registry!(
1147            "jvm_active_bytes",
1148            "The active jvm memory",
1149            registry
1150        )
1151        .unwrap();
1152
1153        let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1154            "stream_materialize_cache_hit_count",
1155            "Materialize executor cache hit count",
1156            &["actor_id", "table_id", "fragment_id"],
1157            registry
1158        )
1159        .unwrap()
1160        .relabel_debug_1(level);
1161
1162        let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1163            "stream_materialize_data_exist_count",
1164            "Materialize executor data exist count",
1165            &["actor_id", "table_id", "fragment_id"],
1166            registry
1167        )
1168        .unwrap()
1169        .relabel_debug_1(level);
1170
1171        let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1172            "stream_materialize_cache_total_count",
1173            "Materialize executor cache total operation",
1174            &["actor_id", "table_id", "fragment_id"],
1175            registry
1176        )
1177        .unwrap()
1178        .relabel_debug_1(level);
1179
1180        let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1181            "stream_memory_usage",
1182            "Memory usage for stream executors",
1183            &["actor_id", "table_id", "desc"],
1184            registry
1185        )
1186        .unwrap()
1187        .relabel_debug_1(level);
1188
1189        Self {
1190            level,
1191            executor_row_count,
1192            mem_stream_node_output_row_count: stream_node_output_row_count,
1193            mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1194            actor_execution_time,
1195            actor_scheduled_duration,
1196            actor_scheduled_cnt,
1197            actor_fast_poll_duration,
1198            actor_fast_poll_cnt,
1199            actor_slow_poll_duration,
1200            actor_slow_poll_cnt,
1201            actor_poll_duration,
1202            actor_poll_cnt,
1203            actor_idle_duration,
1204            actor_idle_cnt,
1205            actor_count,
1206            actor_in_record_cnt,
1207            actor_out_record_cnt,
1208            actor_current_epoch,
1209            source_output_row_count,
1210            source_split_change_count,
1211            source_backfill_row_count,
1212            sink_input_row_count,
1213            sink_input_bytes,
1214            sink_chunk_buffer_size,
1215            exchange_frag_recv_size,
1216            merge_barrier_align_duration,
1217            actor_output_buffer_blocking_duration_ns,
1218            actor_input_buffer_blocking_duration_ns,
1219            join_lookup_miss_count,
1220            join_lookup_total_count,
1221            join_insert_cache_miss_count,
1222            join_actor_input_waiting_duration_ns,
1223            join_match_duration_ns,
1224            join_cached_entry_count,
1225            join_matched_join_keys,
1226            barrier_align_duration,
1227            agg_lookup_miss_count,
1228            agg_total_lookup_count,
1229            agg_cached_entry_count,
1230            agg_chunk_lookup_miss_count,
1231            agg_chunk_total_lookup_count,
1232            agg_dirty_groups_count,
1233            agg_dirty_groups_heap_size,
1234            agg_distinct_cache_miss_count,
1235            agg_distinct_total_cache_count,
1236            agg_distinct_cached_entry_count,
1237            agg_state_cache_lookup_count,
1238            agg_state_cache_miss_count,
1239            group_top_n_cache_miss_count,
1240            group_top_n_total_query_cache_count,
1241            group_top_n_cached_entry_count,
1242            group_top_n_appendonly_cache_miss_count,
1243            group_top_n_appendonly_total_query_cache_count,
1244            group_top_n_appendonly_cached_entry_count,
1245            lookup_cache_miss_count,
1246            lookup_total_query_cache_count,
1247            lookup_cached_entry_count,
1248            temporal_join_cache_miss_count,
1249            temporal_join_total_query_cache_count,
1250            temporal_join_cached_entry_count,
1251            backfill_snapshot_read_row_count,
1252            backfill_upstream_output_row_count,
1253            cdc_backfill_snapshot_read_row_count,
1254            cdc_backfill_upstream_output_row_count,
1255            snapshot_backfill_consume_row_count,
1256            over_window_cached_entry_count,
1257            over_window_cache_lookup_count,
1258            over_window_cache_miss_count,
1259            over_window_range_cache_entry_count,
1260            over_window_range_cache_lookup_count,
1261            over_window_range_cache_left_miss_count,
1262            over_window_range_cache_right_miss_count,
1263            over_window_accessed_entry_count,
1264            over_window_compute_count,
1265            over_window_same_output_count,
1266            barrier_inflight_latency,
1267            barrier_sync_latency,
1268            barrier_batch_size,
1269            barrier_manager_progress,
1270            kv_log_store_storage_write_count,
1271            kv_log_store_storage_write_size,
1272            kv_log_store_rewind_count,
1273            kv_log_store_rewind_delay,
1274            kv_log_store_storage_read_count,
1275            kv_log_store_storage_read_size,
1276            kv_log_store_buffer_unconsumed_item_count,
1277            kv_log_store_buffer_unconsumed_row_count,
1278            kv_log_store_buffer_unconsumed_epoch_count,
1279            kv_log_store_buffer_unconsumed_min_epoch,
1280            sync_kv_log_store_read_count,
1281            sync_kv_log_store_read_size,
1282            sync_kv_log_store_write_pause_duration_ns,
1283            sync_kv_log_store_state,
1284            sync_kv_log_store_wait_next_poll_ns,
1285            sync_kv_log_store_storage_write_count,
1286            sync_kv_log_store_storage_write_size,
1287            sync_kv_log_store_buffer_unconsumed_item_count,
1288            sync_kv_log_store_buffer_unconsumed_row_count,
1289            sync_kv_log_store_buffer_unconsumed_epoch_count,
1290            sync_kv_log_store_buffer_unconsumed_min_epoch,
1291            lru_runtime_loop_count,
1292            lru_latest_sequence,
1293            lru_watermark_sequence,
1294            lru_eviction_policy,
1295            jemalloc_allocated_bytes,
1296            jemalloc_active_bytes,
1297            jemalloc_resident_bytes,
1298            jemalloc_metadata_bytes,
1299            jvm_allocated_bytes,
1300            jvm_active_bytes,
1301            stream_memory_usage,
1302            materialize_cache_hit_count,
1303            materialize_data_exist_count,
1304            materialize_cache_total_count,
1305            materialize_input_row_count,
1306            materialize_current_epoch,
1307        }
1308    }
1309
1310    /// Create a new `StreamingMetrics` instance used in tests or other places.
1311    pub fn unused() -> Self {
1312        global_streaming_metrics(MetricLevel::Disabled)
1313    }
1314
1315    pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1316        let label_list: &[&str; 1] = &[&actor_id.to_string()];
1317        let actor_execution_time = self
1318            .actor_execution_time
1319            .with_guarded_label_values(label_list);
1320        let actor_scheduled_duration = self
1321            .actor_scheduled_duration
1322            .with_guarded_label_values(label_list);
1323        let actor_scheduled_cnt = self
1324            .actor_scheduled_cnt
1325            .with_guarded_label_values(label_list);
1326        let actor_fast_poll_duration = self
1327            .actor_fast_poll_duration
1328            .with_guarded_label_values(label_list);
1329        let actor_fast_poll_cnt = self
1330            .actor_fast_poll_cnt
1331            .with_guarded_label_values(label_list);
1332        let actor_slow_poll_duration = self
1333            .actor_slow_poll_duration
1334            .with_guarded_label_values(label_list);
1335        let actor_slow_poll_cnt = self
1336            .actor_slow_poll_cnt
1337            .with_guarded_label_values(label_list);
1338        let actor_poll_duration = self
1339            .actor_poll_duration
1340            .with_guarded_label_values(label_list);
1341        let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1342        let actor_idle_duration = self
1343            .actor_idle_duration
1344            .with_guarded_label_values(label_list);
1345        let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1346        ActorMetrics {
1347            actor_execution_time,
1348            actor_scheduled_duration,
1349            actor_scheduled_cnt,
1350            actor_fast_poll_duration,
1351            actor_fast_poll_cnt,
1352            actor_slow_poll_duration,
1353            actor_slow_poll_cnt,
1354            actor_poll_duration,
1355            actor_poll_cnt,
1356            actor_idle_duration,
1357            actor_idle_cnt,
1358        }
1359    }
1360
1361    pub(crate) fn new_actor_input_metrics(
1362        &self,
1363        actor_id: ActorId,
1364        fragment_id: FragmentId,
1365        upstream_fragment_id: FragmentId,
1366    ) -> ActorInputMetrics {
1367        let actor_id_str = actor_id.to_string();
1368        let fragment_id_str = fragment_id.to_string();
1369        let upstream_fragment_id_str = upstream_fragment_id.to_string();
1370        ActorInputMetrics {
1371            actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1372                &actor_id_str,
1373                &fragment_id_str,
1374                &upstream_fragment_id_str,
1375            ]),
1376            actor_input_buffer_blocking_duration_ns: self
1377                .actor_input_buffer_blocking_duration_ns
1378                .with_guarded_label_values(&[
1379                    &actor_id_str,
1380                    &fragment_id_str,
1381                    &upstream_fragment_id_str,
1382                ]),
1383        }
1384    }
1385
1386    pub fn new_sink_exec_metrics(
1387        &self,
1388        id: SinkId,
1389        actor_id: ActorId,
1390        fragment_id: FragmentId,
1391    ) -> SinkExecutorMetrics {
1392        let label_list: &[&str; 3] = &[
1393            &id.to_string(),
1394            &actor_id.to_string(),
1395            &fragment_id.to_string(),
1396        ];
1397        SinkExecutorMetrics {
1398            sink_input_row_count: self
1399                .sink_input_row_count
1400                .with_guarded_label_values(label_list),
1401            sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1402            sink_chunk_buffer_size: self
1403                .sink_chunk_buffer_size
1404                .with_guarded_label_values(label_list),
1405        }
1406    }
1407
1408    pub fn new_group_top_n_metrics(
1409        &self,
1410        table_id: u32,
1411        actor_id: ActorId,
1412        fragment_id: FragmentId,
1413    ) -> GroupTopNMetrics {
1414        let label_list: &[&str; 3] = &[
1415            &table_id.to_string(),
1416            &actor_id.to_string(),
1417            &fragment_id.to_string(),
1418        ];
1419
1420        GroupTopNMetrics {
1421            group_top_n_cache_miss_count: self
1422                .group_top_n_cache_miss_count
1423                .with_guarded_label_values(label_list),
1424            group_top_n_total_query_cache_count: self
1425                .group_top_n_total_query_cache_count
1426                .with_guarded_label_values(label_list),
1427            group_top_n_cached_entry_count: self
1428                .group_top_n_cached_entry_count
1429                .with_guarded_label_values(label_list),
1430        }
1431    }
1432
1433    pub fn new_append_only_group_top_n_metrics(
1434        &self,
1435        table_id: u32,
1436        actor_id: ActorId,
1437        fragment_id: FragmentId,
1438    ) -> GroupTopNMetrics {
1439        let label_list: &[&str; 3] = &[
1440            &table_id.to_string(),
1441            &actor_id.to_string(),
1442            &fragment_id.to_string(),
1443        ];
1444
1445        GroupTopNMetrics {
1446            group_top_n_cache_miss_count: self
1447                .group_top_n_appendonly_cache_miss_count
1448                .with_guarded_label_values(label_list),
1449            group_top_n_total_query_cache_count: self
1450                .group_top_n_appendonly_total_query_cache_count
1451                .with_guarded_label_values(label_list),
1452            group_top_n_cached_entry_count: self
1453                .group_top_n_appendonly_cached_entry_count
1454                .with_guarded_label_values(label_list),
1455        }
1456    }
1457
1458    pub fn new_lookup_executor_metrics(
1459        &self,
1460        table_id: TableId,
1461        actor_id: ActorId,
1462        fragment_id: FragmentId,
1463    ) -> LookupExecutorMetrics {
1464        let label_list: &[&str; 3] = &[
1465            &table_id.to_string(),
1466            &actor_id.to_string(),
1467            &fragment_id.to_string(),
1468        ];
1469
1470        LookupExecutorMetrics {
1471            lookup_cache_miss_count: self
1472                .lookup_cache_miss_count
1473                .with_guarded_label_values(label_list),
1474            lookup_total_query_cache_count: self
1475                .lookup_total_query_cache_count
1476                .with_guarded_label_values(label_list),
1477            lookup_cached_entry_count: self
1478                .lookup_cached_entry_count
1479                .with_guarded_label_values(label_list),
1480        }
1481    }
1482
1483    pub fn new_hash_agg_metrics(
1484        &self,
1485        table_id: u32,
1486        actor_id: ActorId,
1487        fragment_id: FragmentId,
1488    ) -> HashAggMetrics {
1489        let label_list: &[&str; 3] = &[
1490            &table_id.to_string(),
1491            &actor_id.to_string(),
1492            &fragment_id.to_string(),
1493        ];
1494        HashAggMetrics {
1495            agg_lookup_miss_count: self
1496                .agg_lookup_miss_count
1497                .with_guarded_label_values(label_list),
1498            agg_total_lookup_count: self
1499                .agg_total_lookup_count
1500                .with_guarded_label_values(label_list),
1501            agg_cached_entry_count: self
1502                .agg_cached_entry_count
1503                .with_guarded_label_values(label_list),
1504            agg_chunk_lookup_miss_count: self
1505                .agg_chunk_lookup_miss_count
1506                .with_guarded_label_values(label_list),
1507            agg_chunk_total_lookup_count: self
1508                .agg_chunk_total_lookup_count
1509                .with_guarded_label_values(label_list),
1510            agg_dirty_groups_count: self
1511                .agg_dirty_groups_count
1512                .with_guarded_label_values(label_list),
1513            agg_dirty_groups_heap_size: self
1514                .agg_dirty_groups_heap_size
1515                .with_guarded_label_values(label_list),
1516            agg_state_cache_lookup_count: self
1517                .agg_state_cache_lookup_count
1518                .with_guarded_label_values(label_list),
1519            agg_state_cache_miss_count: self
1520                .agg_state_cache_miss_count
1521                .with_guarded_label_values(label_list),
1522        }
1523    }
1524
1525    pub fn new_agg_distinct_dedup_metrics(
1526        &self,
1527        table_id: u32,
1528        actor_id: ActorId,
1529        fragment_id: FragmentId,
1530    ) -> AggDistinctDedupMetrics {
1531        let label_list: &[&str; 3] = &[
1532            &table_id.to_string(),
1533            &actor_id.to_string(),
1534            &fragment_id.to_string(),
1535        ];
1536        AggDistinctDedupMetrics {
1537            agg_distinct_cache_miss_count: self
1538                .agg_distinct_cache_miss_count
1539                .with_guarded_label_values(label_list),
1540            agg_distinct_total_cache_count: self
1541                .agg_distinct_total_cache_count
1542                .with_guarded_label_values(label_list),
1543            agg_distinct_cached_entry_count: self
1544                .agg_distinct_cached_entry_count
1545                .with_guarded_label_values(label_list),
1546        }
1547    }
1548
1549    pub fn new_temporal_join_metrics(
1550        &self,
1551        table_id: TableId,
1552        actor_id: ActorId,
1553        fragment_id: FragmentId,
1554    ) -> TemporalJoinMetrics {
1555        let label_list: &[&str; 3] = &[
1556            &table_id.to_string(),
1557            &actor_id.to_string(),
1558            &fragment_id.to_string(),
1559        ];
1560        TemporalJoinMetrics {
1561            temporal_join_cache_miss_count: self
1562                .temporal_join_cache_miss_count
1563                .with_guarded_label_values(label_list),
1564            temporal_join_total_query_cache_count: self
1565                .temporal_join_total_query_cache_count
1566                .with_guarded_label_values(label_list),
1567            temporal_join_cached_entry_count: self
1568                .temporal_join_cached_entry_count
1569                .with_guarded_label_values(label_list),
1570        }
1571    }
1572
1573    pub fn new_backfill_metrics(&self, table_id: u32, actor_id: ActorId) -> BackfillMetrics {
1574        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1575        BackfillMetrics {
1576            backfill_snapshot_read_row_count: self
1577                .backfill_snapshot_read_row_count
1578                .with_guarded_label_values(label_list),
1579            backfill_upstream_output_row_count: self
1580                .backfill_upstream_output_row_count
1581                .with_guarded_label_values(label_list),
1582        }
1583    }
1584
1585    pub fn new_cdc_backfill_metrics(
1586        &self,
1587        table_id: TableId,
1588        actor_id: ActorId,
1589    ) -> CdcBackfillMetrics {
1590        let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1591        CdcBackfillMetrics {
1592            cdc_backfill_snapshot_read_row_count: self
1593                .cdc_backfill_snapshot_read_row_count
1594                .with_guarded_label_values(label_list),
1595            cdc_backfill_upstream_output_row_count: self
1596                .cdc_backfill_upstream_output_row_count
1597                .with_guarded_label_values(label_list),
1598        }
1599    }
1600
1601    pub fn new_over_window_metrics(
1602        &self,
1603        table_id: u32,
1604        actor_id: ActorId,
1605        fragment_id: FragmentId,
1606    ) -> OverWindowMetrics {
1607        let label_list: &[&str; 3] = &[
1608            &table_id.to_string(),
1609            &actor_id.to_string(),
1610            &fragment_id.to_string(),
1611        ];
1612        OverWindowMetrics {
1613            over_window_cached_entry_count: self
1614                .over_window_cached_entry_count
1615                .with_guarded_label_values(label_list),
1616            over_window_cache_lookup_count: self
1617                .over_window_cache_lookup_count
1618                .with_guarded_label_values(label_list),
1619            over_window_cache_miss_count: self
1620                .over_window_cache_miss_count
1621                .with_guarded_label_values(label_list),
1622            over_window_range_cache_entry_count: self
1623                .over_window_range_cache_entry_count
1624                .with_guarded_label_values(label_list),
1625            over_window_range_cache_lookup_count: self
1626                .over_window_range_cache_lookup_count
1627                .with_guarded_label_values(label_list),
1628            over_window_range_cache_left_miss_count: self
1629                .over_window_range_cache_left_miss_count
1630                .with_guarded_label_values(label_list),
1631            over_window_range_cache_right_miss_count: self
1632                .over_window_range_cache_right_miss_count
1633                .with_guarded_label_values(label_list),
1634            over_window_accessed_entry_count: self
1635                .over_window_accessed_entry_count
1636                .with_guarded_label_values(label_list),
1637            over_window_compute_count: self
1638                .over_window_compute_count
1639                .with_guarded_label_values(label_list),
1640            over_window_same_output_count: self
1641                .over_window_same_output_count
1642                .with_guarded_label_values(label_list),
1643        }
1644    }
1645
1646    pub fn new_materialize_metrics(
1647        &self,
1648        table_id: TableId,
1649        actor_id: ActorId,
1650        fragment_id: FragmentId,
1651    ) -> MaterializeMetrics {
1652        let label_list: &[&str; 3] = &[
1653            &actor_id.to_string(),
1654            &table_id.to_string(),
1655            &fragment_id.to_string(),
1656        ];
1657        MaterializeMetrics {
1658            materialize_cache_hit_count: self
1659                .materialize_cache_hit_count
1660                .with_guarded_label_values(label_list),
1661            materialize_data_exist_count: self
1662                .materialize_data_exist_count
1663                .with_guarded_label_values(label_list),
1664            materialize_cache_total_count: self
1665                .materialize_cache_total_count
1666                .with_guarded_label_values(label_list),
1667            materialize_input_row_count: self
1668                .materialize_input_row_count
1669                .with_guarded_label_values(label_list),
1670            materialize_current_epoch: self
1671                .materialize_current_epoch
1672                .with_guarded_label_values(label_list),
1673        }
1674    }
1675}
1676
1677pub(crate) struct ActorInputMetrics {
1678    pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1679    pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1680}
1681
1682/// Tokio metrics for actors
1683pub struct ActorMetrics {
1684    pub actor_execution_time: LabelGuardedGauge,
1685    pub actor_scheduled_duration: LabelGuardedGauge,
1686    pub actor_scheduled_cnt: LabelGuardedIntGauge,
1687    pub actor_fast_poll_duration: LabelGuardedGauge,
1688    pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1689    pub actor_slow_poll_duration: LabelGuardedGauge,
1690    pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1691    pub actor_poll_duration: LabelGuardedGauge,
1692    pub actor_poll_cnt: LabelGuardedIntGauge,
1693    pub actor_idle_duration: LabelGuardedGauge,
1694    pub actor_idle_cnt: LabelGuardedIntGauge,
1695}
1696
1697pub struct SinkExecutorMetrics {
1698    pub sink_input_row_count: LabelGuardedIntCounter,
1699    pub sink_input_bytes: LabelGuardedIntCounter,
1700    pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1701}
1702
1703pub struct MaterializeMetrics {
1704    pub materialize_cache_hit_count: LabelGuardedIntCounter,
1705    pub materialize_data_exist_count: LabelGuardedIntCounter,
1706    pub materialize_cache_total_count: LabelGuardedIntCounter,
1707    pub materialize_input_row_count: LabelGuardedIntCounter,
1708    pub materialize_current_epoch: LabelGuardedIntGauge,
1709}
1710
1711pub struct GroupTopNMetrics {
1712    pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1713    pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1714    pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1715}
1716
1717pub struct LookupExecutorMetrics {
1718    pub lookup_cache_miss_count: LabelGuardedIntCounter,
1719    pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1720    pub lookup_cached_entry_count: LabelGuardedIntGauge,
1721}
1722
1723pub struct HashAggMetrics {
1724    pub agg_lookup_miss_count: LabelGuardedIntCounter,
1725    pub agg_total_lookup_count: LabelGuardedIntCounter,
1726    pub agg_cached_entry_count: LabelGuardedIntGauge,
1727    pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1728    pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1729    pub agg_dirty_groups_count: LabelGuardedIntGauge,
1730    pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1731    pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1732    pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1733}
1734
1735pub struct AggDistinctDedupMetrics {
1736    pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1737    pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1738    pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1739}
1740
1741pub struct TemporalJoinMetrics {
1742    pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1743    pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1744    pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1745}
1746
1747pub struct BackfillMetrics {
1748    pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1749    pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1750}
1751
1752pub struct CdcBackfillMetrics {
1753    pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1754    pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1755}
1756
1757pub struct OverWindowMetrics {
1758    pub over_window_cached_entry_count: LabelGuardedIntGauge,
1759    pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1760    pub over_window_cache_miss_count: LabelGuardedIntCounter,
1761    pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1762    pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1763    pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1764    pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1765    pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1766    pub over_window_compute_count: LabelGuardedIntCounter,
1767    pub over_window_same_output_count: LabelGuardedIntCounter,
1768}