1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26 LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46 pub level: MetricLevel,
47
48 pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51 pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55 pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57 actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59 actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60 actor_poll_duration: RelabeledGuardedIntCounterVec,
61 actor_poll_cnt: RelabeledGuardedIntCounterVec,
62 actor_idle_duration: RelabeledGuardedIntCounterVec,
63 actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65 pub actor_count: LabelGuardedIntGaugeVec,
67 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
70
71 pub source_output_row_count: LabelGuardedIntCounterVec,
73 pub source_split_change_count: LabelGuardedIntCounterVec,
74 pub source_backfill_row_count: LabelGuardedIntCounterVec,
75
76 sink_input_row_count: LabelGuardedIntCounterVec,
78 sink_input_bytes: LabelGuardedIntCounterVec,
79 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
80
81 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
83
84 pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
87
88 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90 actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91
92 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
94 pub join_lookup_total_count: LabelGuardedIntCounterVec,
95 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
96 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
97 pub join_match_duration_ns: LabelGuardedIntCounterVec,
98 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
99 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
100
101 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
103
104 agg_lookup_miss_count: LabelGuardedIntCounterVec,
106 agg_total_lookup_count: LabelGuardedIntCounterVec,
107 agg_cached_entry_count: LabelGuardedIntGaugeVec,
108 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
109 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
110 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
111 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
112 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
113 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
114 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
115 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
116 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
117
118 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
120 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
121 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
122 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
126
127 lookup_cache_miss_count: LabelGuardedIntCounterVec,
129 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
130 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
131
132 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
134 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
135 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
136
137 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
139 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
140
141 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
147
148 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
150 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
151 over_window_cache_miss_count: LabelGuardedIntCounterVec,
152 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
153 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
154 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
155 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
156 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
157 over_window_compute_count: LabelGuardedIntCounterVec,
158 over_window_same_output_count: LabelGuardedIntCounterVec,
159
160 pub barrier_inflight_latency: Histogram,
164 pub barrier_sync_latency: Histogram,
166 pub barrier_batch_size: Histogram,
167 pub barrier_manager_progress: IntCounter,
169
170 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
171 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
172 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
173 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
174 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
177 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
178 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
179 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
180
181 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
182 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
183 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
184 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
185 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
189 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
190 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
191 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
192
193 pub lru_runtime_loop_count: IntCounter,
195 pub lru_latest_sequence: IntGauge,
196 pub lru_watermark_sequence: IntGauge,
197 pub lru_eviction_policy: IntGauge,
198 pub jemalloc_allocated_bytes: IntGauge,
199 pub jemalloc_active_bytes: IntGauge,
200 pub jemalloc_resident_bytes: IntGauge,
201 pub jemalloc_metadata_bytes: IntGauge,
202 pub jvm_allocated_bytes: IntGauge,
203 pub jvm_active_bytes: IntGauge,
204 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
205
206 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
208 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
209 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
210 materialize_input_row_count: RelabeledGuardedIntCounterVec,
211 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
212
213 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
215 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
216
217 pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
219 pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
220
221 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223
224 pub state_table_iter_count: RelabeledGuardedIntCounterVec,
226 pub state_table_get_count: RelabeledGuardedIntCounterVec,
227 pub state_table_iter_vnode_pruned_count: RelabeledGuardedIntCounterVec,
228 pub state_table_get_vnode_pruned_count: RelabeledGuardedIntCounterVec,
229}
230
231pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
232
233pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
234 GLOBAL_STREAMING_METRICS
235 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
236 .clone()
237}
238
239impl StreamingMetrics {
240 fn new(registry: &Registry, level: MetricLevel) -> Self {
241 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
242 "stream_executor_row_count",
243 "Total number of rows that have been output from each executor",
244 &["actor_id", "fragment_id", "executor_identity"],
245 registry
246 )
247 .unwrap()
248 .relabel_debug_1(level);
249
250 let stream_node_output_row_count = CountMap::new();
251 let stream_node_output_blocking_duration_ns = CountMap::new();
252
253 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
254 "stream_source_output_rows_counts",
255 "Total number of rows that have been output from source",
256 &["source_id", "source_name", "actor_id", "fragment_id"],
257 registry
258 )
259 .unwrap();
260
261 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
262 "stream_source_split_change_event_count",
263 "Total number of split change events that have been operated by source",
264 &["source_id", "source_name", "actor_id", "fragment_id"],
265 registry
266 )
267 .unwrap();
268
269 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
270 "stream_source_backfill_rows_counts",
271 "Total number of rows that have been backfilled for source",
272 &["source_id", "source_name", "actor_id", "fragment_id"],
273 registry
274 )
275 .unwrap();
276
277 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
278 "stream_sink_input_row_count",
279 "Total number of rows streamed into sink executors",
280 &["sink_id", "actor_id", "fragment_id"],
281 registry
282 )
283 .unwrap();
284
285 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
286 "stream_sink_input_bytes",
287 "Total size of chunks streamed into sink executors",
288 &["sink_id", "actor_id", "fragment_id"],
289 registry
290 )
291 .unwrap();
292
293 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
294 "stream_mview_input_row_count",
295 "Total number of rows streamed into materialize executors",
296 &["actor_id", "table_id", "fragment_id"],
297 registry
298 )
299 .unwrap()
300 .relabel_debug_1(level);
301
302 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
303 "stream_mview_current_epoch",
304 "The current epoch of the materialized executor",
305 &["actor_id", "table_id", "fragment_id"],
306 registry
307 )
308 .unwrap()
309 .relabel_debug_1(level);
310
311 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
312 "stream_pg_cdc_state_table_lsn",
313 "Current LSN value stored in PostgreSQL CDC state table",
314 &["source_id"],
315 registry,
316 )
317 .unwrap();
318
319 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
320 "stream_pg_cdc_jni_commit_offset_lsn",
321 "LSN value when JNI commit offset is called for PostgreSQL CDC",
322 &["source_id"],
323 registry,
324 )
325 .unwrap();
326
327 let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
328 "stream_mysql_cdc_state_binlog_file_seq",
329 "Current binlog file sequence number stored in MySQL CDC state table",
330 &["source_id"],
331 registry,
332 )
333 .unwrap();
334
335 let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
336 "stream_mysql_cdc_state_binlog_position",
337 "Current binlog position stored in MySQL CDC state table",
338 &["source_id"],
339 registry,
340 )
341 .unwrap();
342
343 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
344 "stream_sink_chunk_buffer_size",
345 "Total size of chunks buffered in a barrier",
346 &["sink_id", "actor_id", "fragment_id"],
347 registry
348 )
349 .unwrap();
350 let actor_output_buffer_blocking_duration_ns =
351 register_guarded_int_counter_vec_with_registry!(
352 "stream_actor_output_buffer_blocking_duration_ns",
353 "Total blocking duration (ns) of output buffer",
354 &["actor_id", "fragment_id", "downstream_fragment_id"],
355 registry
356 )
357 .unwrap()
358 .relabel_debug_1(level);
360
361 let actor_input_buffer_blocking_duration_ns =
362 register_guarded_int_counter_vec_with_registry!(
363 "stream_actor_input_buffer_blocking_duration_ns",
364 "Total blocking duration (ns) of input buffer",
365 &["actor_id", "fragment_id", "upstream_fragment_id"],
366 registry
367 )
368 .unwrap()
369 .relabel_debug_1(level);
371
372 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
373 "stream_exchange_frag_recv_size",
374 "Total size of messages that have been received from upstream Fragment",
375 &["up_fragment_id", "down_fragment_id"],
376 registry
377 )
378 .unwrap();
379
380 let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
381 "stream_actor_poll_duration",
382 "tokio's metrics",
383 &["actor_id", "fragment_id"],
384 registry
385 )
386 .unwrap()
387 .relabel_debug_1(level);
388
389 let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
390 "stream_actor_poll_cnt",
391 "tokio's metrics",
392 &["actor_id", "fragment_id"],
393 registry
394 )
395 .unwrap()
396 .relabel_debug_1(level);
397
398 let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
399 "stream_actor_scheduled_duration",
400 "tokio's metrics",
401 &["actor_id", "fragment_id"],
402 registry
403 )
404 .unwrap()
405 .relabel_debug_1(level);
406
407 let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
408 "stream_actor_scheduled_cnt",
409 "tokio's metrics",
410 &["actor_id", "fragment_id"],
411 registry
412 )
413 .unwrap()
414 .relabel_debug_1(level);
415
416 let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
417 "stream_actor_idle_duration",
418 "tokio's metrics",
419 &["actor_id", "fragment_id"],
420 registry
421 )
422 .unwrap()
423 .relabel_debug_1(level);
424
425 let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
426 "stream_actor_idle_cnt",
427 "tokio's metrics",
428 &["actor_id", "fragment_id"],
429 registry
430 )
431 .unwrap()
432 .relabel_debug_1(level);
433
434 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
435 "stream_actor_in_record_cnt",
436 "Total number of rows actor received",
437 &["actor_id", "fragment_id", "upstream_fragment_id"],
438 registry
439 )
440 .unwrap()
441 .relabel_debug_1(level);
442
443 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
444 "stream_actor_out_record_cnt",
445 "Total number of rows actor sent",
446 &["actor_id", "fragment_id"],
447 registry
448 )
449 .unwrap()
450 .relabel_debug_1(level);
451
452 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
453 "stream_actor_current_epoch",
454 "Current epoch of actor",
455 &["actor_id", "fragment_id"],
456 registry
457 )
458 .unwrap()
459 .relabel_debug_1(level);
460
461 let actor_count = register_guarded_int_gauge_vec_with_registry!(
462 "stream_actor_count",
463 "Total number of actors (parallelism)",
464 &["fragment_id"],
465 registry
466 )
467 .unwrap();
468
469 let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
470 "stream_merge_barrier_align_duration_ns",
471 "Total merge barrier alignment duration (ns)",
472 &["actor_id", "fragment_id"],
473 registry
474 )
475 .unwrap()
476 .relabel_debug_1(level);
477
478 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
479 "stream_join_lookup_miss_count",
480 "Join executor lookup miss duration",
481 &["side", "join_table_id", "actor_id", "fragment_id"],
482 registry
483 )
484 .unwrap();
485
486 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
487 "stream_join_lookup_total_count",
488 "Join executor lookup total operation",
489 &["side", "join_table_id", "actor_id", "fragment_id"],
490 registry
491 )
492 .unwrap();
493
494 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
495 "stream_join_insert_cache_miss_count",
496 "Join executor cache miss when insert operation",
497 &["side", "join_table_id", "actor_id", "fragment_id"],
498 registry
499 )
500 .unwrap();
501
502 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
503 "stream_join_actor_input_waiting_duration_ns",
504 "Total waiting duration (ns) of input buffer of join actor",
505 &["actor_id", "fragment_id"],
506 registry
507 )
508 .unwrap();
509
510 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
511 "stream_join_match_duration_ns",
512 "Matching duration for each side",
513 &["actor_id", "fragment_id", "side"],
514 registry
515 )
516 .unwrap();
517
518 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
519 "stream_barrier_align_duration_ns",
520 "Duration of join align barrier",
521 &["actor_id", "fragment_id", "wait_side", "executor"],
522 registry
523 )
524 .unwrap()
525 .relabel_debug_1(level);
526
527 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
528 "stream_join_cached_entry_count",
529 "Number of cached entries in streaming join operators",
530 &["actor_id", "fragment_id", "side"],
531 registry
532 )
533 .unwrap();
534
535 let join_matched_join_keys_opts = histogram_opts!(
536 "stream_join_matched_join_keys",
537 "The number of keys matched in the opposite side",
538 exponential_buckets(16.0, 2.0, 28).unwrap() );
540
541 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
542 join_matched_join_keys_opts,
543 &["actor_id", "fragment_id", "table_id"],
544 registry
545 )
546 .unwrap()
547 .relabel_debug_1(level);
548
549 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
550 "stream_agg_lookup_miss_count",
551 "Aggregation executor lookup miss duration",
552 &["table_id", "actor_id", "fragment_id"],
553 registry
554 )
555 .unwrap();
556
557 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
558 "stream_agg_lookup_total_count",
559 "Aggregation executor lookup total operation",
560 &["table_id", "actor_id", "fragment_id"],
561 registry
562 )
563 .unwrap();
564
565 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
566 "stream_agg_distinct_cache_miss_count",
567 "Aggregation executor dinsinct miss duration",
568 &["table_id", "actor_id", "fragment_id"],
569 registry
570 )
571 .unwrap();
572
573 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
574 "stream_agg_distinct_total_cache_count",
575 "Aggregation executor distinct total operation",
576 &["table_id", "actor_id", "fragment_id"],
577 registry
578 )
579 .unwrap();
580
581 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
582 "stream_agg_distinct_cached_entry_count",
583 "Total entry counts in distinct aggregation executor cache",
584 &["table_id", "actor_id", "fragment_id"],
585 registry
586 )
587 .unwrap();
588
589 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
590 "stream_agg_dirty_groups_count",
591 "Total dirty group counts in aggregation executor",
592 &["table_id", "actor_id", "fragment_id"],
593 registry
594 )
595 .unwrap();
596
597 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
598 "stream_agg_dirty_groups_heap_size",
599 "Total dirty group heap size in aggregation executor",
600 &["table_id", "actor_id", "fragment_id"],
601 registry
602 )
603 .unwrap();
604
605 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
606 "stream_agg_state_cache_lookup_count",
607 "Aggregation executor state cache lookup count",
608 &["table_id", "actor_id", "fragment_id"],
609 registry
610 )
611 .unwrap();
612
613 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
614 "stream_agg_state_cache_miss_count",
615 "Aggregation executor state cache miss count",
616 &["table_id", "actor_id", "fragment_id"],
617 registry
618 )
619 .unwrap();
620
621 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
622 "stream_group_top_n_cache_miss_count",
623 "Group top n executor cache miss count",
624 &["table_id", "actor_id", "fragment_id"],
625 registry
626 )
627 .unwrap();
628
629 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
630 "stream_group_top_n_total_query_cache_count",
631 "Group top n executor query cache total count",
632 &["table_id", "actor_id", "fragment_id"],
633 registry
634 )
635 .unwrap();
636
637 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
638 "stream_group_top_n_cached_entry_count",
639 "Total entry counts in group top n executor cache",
640 &["table_id", "actor_id", "fragment_id"],
641 registry
642 )
643 .unwrap();
644
645 let group_top_n_appendonly_cache_miss_count =
646 register_guarded_int_counter_vec_with_registry!(
647 "stream_group_top_n_appendonly_cache_miss_count",
648 "Group top n appendonly executor cache miss count",
649 &["table_id", "actor_id", "fragment_id"],
650 registry
651 )
652 .unwrap();
653
654 let group_top_n_appendonly_total_query_cache_count =
655 register_guarded_int_counter_vec_with_registry!(
656 "stream_group_top_n_appendonly_total_query_cache_count",
657 "Group top n appendonly executor total cache count",
658 &["table_id", "actor_id", "fragment_id"],
659 registry
660 )
661 .unwrap();
662
663 let group_top_n_appendonly_cached_entry_count =
664 register_guarded_int_gauge_vec_with_registry!(
665 "stream_group_top_n_appendonly_cached_entry_count",
666 "Total entry counts in group top n appendonly executor cache",
667 &["table_id", "actor_id", "fragment_id"],
668 registry
669 )
670 .unwrap();
671
672 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
673 "stream_lookup_cache_miss_count",
674 "Lookup executor cache miss count",
675 &["table_id", "actor_id", "fragment_id"],
676 registry
677 )
678 .unwrap();
679
680 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
681 "stream_lookup_total_query_cache_count",
682 "Lookup executor query cache total count",
683 &["table_id", "actor_id", "fragment_id"],
684 registry
685 )
686 .unwrap();
687
688 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
689 "stream_lookup_cached_entry_count",
690 "Total entry counts in lookup executor cache",
691 &["table_id", "actor_id", "fragment_id"],
692 registry
693 )
694 .unwrap();
695
696 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
697 "stream_temporal_join_cache_miss_count",
698 "Temporal join executor cache miss count",
699 &["table_id", "actor_id", "fragment_id"],
700 registry
701 )
702 .unwrap();
703
704 let temporal_join_total_query_cache_count =
705 register_guarded_int_counter_vec_with_registry!(
706 "stream_temporal_join_total_query_cache_count",
707 "Temporal join executor query cache total count",
708 &["table_id", "actor_id", "fragment_id"],
709 registry
710 )
711 .unwrap();
712
713 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
714 "stream_temporal_join_cached_entry_count",
715 "Total entry count in temporal join executor cache",
716 &["table_id", "actor_id", "fragment_id"],
717 registry
718 )
719 .unwrap();
720
721 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
722 "stream_agg_cached_entry_count",
723 "Number of cached keys in streaming aggregation operators",
724 &["table_id", "actor_id", "fragment_id"],
725 registry
726 )
727 .unwrap();
728
729 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
730 "stream_agg_chunk_lookup_miss_count",
731 "Aggregation executor chunk-level lookup miss duration",
732 &["table_id", "actor_id", "fragment_id"],
733 registry
734 )
735 .unwrap();
736
737 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
738 "stream_agg_chunk_lookup_total_count",
739 "Aggregation executor chunk-level lookup total operation",
740 &["table_id", "actor_id", "fragment_id"],
741 registry
742 )
743 .unwrap();
744
745 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
746 "stream_backfill_snapshot_read_row_count",
747 "Total number of rows that have been read from the backfill snapshot",
748 &["table_id", "actor_id"],
749 registry
750 )
751 .unwrap();
752
753 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
754 "stream_backfill_upstream_output_row_count",
755 "Total number of rows that have been output from the backfill upstream",
756 &["table_id", "actor_id"],
757 registry
758 )
759 .unwrap();
760
761 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
762 "stream_cdc_backfill_snapshot_read_row_count",
763 "Total number of rows that have been read from the cdc_backfill snapshot",
764 &["table_id", "actor_id"],
765 registry
766 )
767 .unwrap();
768
769 let cdc_backfill_upstream_output_row_count =
770 register_guarded_int_counter_vec_with_registry!(
771 "stream_cdc_backfill_upstream_output_row_count",
772 "Total number of rows that have been output from the cdc_backfill upstream",
773 &["table_id", "actor_id"],
774 registry
775 )
776 .unwrap();
777
778 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
779 "stream_snapshot_backfill_consume_snapshot_row_count",
780 "Total number of rows that have been output from snapshot backfill",
781 &["table_id", "actor_id", "stage"],
782 registry
783 )
784 .unwrap();
785
786 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
787 "stream_over_window_cached_entry_count",
788 "Total entry (partition) count in over window executor cache",
789 &["table_id", "actor_id", "fragment_id"],
790 registry
791 )
792 .unwrap();
793
794 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
795 "stream_over_window_cache_lookup_count",
796 "Over window executor cache lookup count",
797 &["table_id", "actor_id", "fragment_id"],
798 registry
799 )
800 .unwrap();
801
802 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
803 "stream_over_window_cache_miss_count",
804 "Over window executor cache miss count",
805 &["table_id", "actor_id", "fragment_id"],
806 registry
807 )
808 .unwrap();
809
810 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
811 "stream_over_window_range_cache_entry_count",
812 "Over window partition range cache entry count",
813 &["table_id", "actor_id", "fragment_id"],
814 registry,
815 )
816 .unwrap();
817
818 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
819 "stream_over_window_range_cache_lookup_count",
820 "Over window partition range cache lookup count",
821 &["table_id", "actor_id", "fragment_id"],
822 registry
823 )
824 .unwrap();
825
826 let over_window_range_cache_left_miss_count =
827 register_guarded_int_counter_vec_with_registry!(
828 "stream_over_window_range_cache_left_miss_count",
829 "Over window partition range cache left miss count",
830 &["table_id", "actor_id", "fragment_id"],
831 registry
832 )
833 .unwrap();
834
835 let over_window_range_cache_right_miss_count =
836 register_guarded_int_counter_vec_with_registry!(
837 "stream_over_window_range_cache_right_miss_count",
838 "Over window partition range cache right miss count",
839 &["table_id", "actor_id", "fragment_id"],
840 registry
841 )
842 .unwrap();
843
844 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
845 "stream_over_window_accessed_entry_count",
846 "Over window accessed entry count",
847 &["table_id", "actor_id", "fragment_id"],
848 registry
849 )
850 .unwrap();
851
852 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
853 "stream_over_window_compute_count",
854 "Over window compute count",
855 &["table_id", "actor_id", "fragment_id"],
856 registry
857 )
858 .unwrap();
859
860 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
861 "stream_over_window_same_output_count",
862 "Over window same output count",
863 &["table_id", "actor_id", "fragment_id"],
864 registry
865 )
866 .unwrap();
867
868 let opts = histogram_opts!(
869 "stream_barrier_inflight_duration_seconds",
870 "barrier_inflight_latency",
871 exponential_buckets(0.1, 1.5, 16).unwrap() );
873 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
874
875 let opts = histogram_opts!(
876 "stream_barrier_sync_storage_duration_seconds",
877 "barrier_sync_latency",
878 exponential_buckets(0.1, 1.5, 16).unwrap() );
880 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
881
882 let opts = histogram_opts!(
883 "stream_barrier_batch_size",
884 "barrier_batch_size",
885 exponential_buckets(1.0, 2.0, 8).unwrap()
886 );
887 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
888
889 let barrier_manager_progress = register_int_counter_with_registry!(
890 "stream_barrier_manager_progress",
891 "The number of actors that have processed the earliest in-flight barriers",
892 registry
893 )
894 .unwrap();
895
896 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
897 "sync_kv_log_store_wait_next_poll_ns",
898 "Total duration (ns) of waiting for next poll",
899 &["actor_id", "target", "fragment_id", "relation"],
900 registry
901 )
902 .unwrap();
903
904 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
905 "sync_kv_log_store_read_count",
906 "read row count throughput of sync_kv log store",
907 &["type", "actor_id", "target", "fragment_id", "relation"],
908 registry
909 )
910 .unwrap();
911
912 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
913 "sync_kv_log_store_read_size",
914 "read size throughput of sync_kv log store",
915 &["type", "actor_id", "target", "fragment_id", "relation"],
916 registry
917 )
918 .unwrap();
919
920 let sync_kv_log_store_write_pause_duration_ns =
921 register_guarded_int_counter_vec_with_registry!(
922 "sync_kv_log_store_write_pause_duration_ns",
923 "Duration (ns) of sync_kv log store write pause",
924 &["actor_id", "target", "fragment_id", "relation"],
925 registry
926 )
927 .unwrap();
928
929 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
930 "sync_kv_log_store_state",
931 "clean/unclean state transition for sync_kv log store",
932 &["state", "actor_id", "target", "fragment_id", "relation"],
933 registry
934 )
935 .unwrap();
936
937 let sync_kv_log_store_storage_write_count =
938 register_guarded_int_counter_vec_with_registry!(
939 "sync_kv_log_store_storage_write_count",
940 "Write row count throughput of sync_kv log store",
941 &["actor_id", "target", "fragment_id", "relation"],
942 registry
943 )
944 .unwrap();
945
946 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
947 "sync_kv_log_store_storage_write_size",
948 "Write size throughput of sync_kv log store",
949 &["actor_id", "target", "fragment_id", "relation"],
950 registry
951 )
952 .unwrap();
953
954 let sync_kv_log_store_buffer_unconsumed_item_count =
955 register_guarded_int_gauge_vec_with_registry!(
956 "sync_kv_log_store_buffer_unconsumed_item_count",
957 "Number of Unconsumed Item in buffer",
958 &["actor_id", "target", "fragment_id", "relation"],
959 registry
960 )
961 .unwrap();
962
963 let sync_kv_log_store_buffer_unconsumed_row_count =
964 register_guarded_int_gauge_vec_with_registry!(
965 "sync_kv_log_store_buffer_unconsumed_row_count",
966 "Number of Unconsumed Row in buffer",
967 &["actor_id", "target", "fragment_id", "relation"],
968 registry
969 )
970 .unwrap();
971
972 let sync_kv_log_store_buffer_unconsumed_epoch_count =
973 register_guarded_int_gauge_vec_with_registry!(
974 "sync_kv_log_store_buffer_unconsumed_epoch_count",
975 "Number of Unconsumed Epoch in buffer",
976 &["actor_id", "target", "fragment_id", "relation"],
977 registry
978 )
979 .unwrap();
980
981 let sync_kv_log_store_buffer_unconsumed_min_epoch =
982 register_guarded_int_gauge_vec_with_registry!(
983 "sync_kv_log_store_buffer_unconsumed_min_epoch",
984 "Number of Unconsumed Epoch in buffer",
985 &["actor_id", "target", "fragment_id", "relation"],
986 registry
987 )
988 .unwrap();
989
990 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
991 "kv_log_store_storage_write_count",
992 "Write row count throughput of kv log store",
993 &["actor_id", "connector", "sink_id", "sink_name"],
994 registry
995 )
996 .unwrap();
997
998 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
999 "kv_log_store_storage_write_size",
1000 "Write size throughput of kv log store",
1001 &["actor_id", "connector", "sink_id", "sink_name"],
1002 registry
1003 )
1004 .unwrap();
1005
1006 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1007 "kv_log_store_storage_read_count",
1008 "Write row count throughput of kv log store",
1009 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1010 registry
1011 )
1012 .unwrap();
1013
1014 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1015 "kv_log_store_storage_read_size",
1016 "Write size throughput of kv log store",
1017 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1018 registry
1019 )
1020 .unwrap();
1021
1022 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1023 "kv_log_store_rewind_count",
1024 "Kv log store rewind rate",
1025 &["actor_id", "connector", "sink_id", "sink_name"],
1026 registry
1027 )
1028 .unwrap();
1029
1030 let kv_log_store_rewind_delay_opts = {
1031 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1032 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1033 - REWIND_BASE_DELAY.as_secs_f64().log2())
1034 .ceil() as usize;
1035 let buckets = exponential_buckets(
1036 REWIND_BASE_DELAY.as_secs_f64(),
1037 REWIND_BACKOFF_FACTOR as _,
1038 bucket_count,
1039 )
1040 .unwrap();
1041 histogram_opts!(
1042 "kv_log_store_rewind_delay",
1043 "Kv log store rewind delay",
1044 buckets,
1045 )
1046 };
1047
1048 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1049 kv_log_store_rewind_delay_opts,
1050 &["actor_id", "connector", "sink_id", "sink_name"],
1051 registry
1052 )
1053 .unwrap();
1054
1055 let kv_log_store_buffer_unconsumed_item_count =
1056 register_guarded_int_gauge_vec_with_registry!(
1057 "kv_log_store_buffer_unconsumed_item_count",
1058 "Number of Unconsumed Item in buffer",
1059 &["actor_id", "connector", "sink_id", "sink_name"],
1060 registry
1061 )
1062 .unwrap();
1063
1064 let kv_log_store_buffer_unconsumed_row_count =
1065 register_guarded_int_gauge_vec_with_registry!(
1066 "kv_log_store_buffer_unconsumed_row_count",
1067 "Number of Unconsumed Row in buffer",
1068 &["actor_id", "connector", "sink_id", "sink_name"],
1069 registry
1070 )
1071 .unwrap();
1072
1073 let kv_log_store_buffer_unconsumed_epoch_count =
1074 register_guarded_int_gauge_vec_with_registry!(
1075 "kv_log_store_buffer_unconsumed_epoch_count",
1076 "Number of Unconsumed Epoch in buffer",
1077 &["actor_id", "connector", "sink_id", "sink_name"],
1078 registry
1079 )
1080 .unwrap();
1081
1082 let kv_log_store_buffer_unconsumed_min_epoch =
1083 register_guarded_int_gauge_vec_with_registry!(
1084 "kv_log_store_buffer_unconsumed_min_epoch",
1085 "Number of Unconsumed Epoch in buffer",
1086 &["actor_id", "connector", "sink_id", "sink_name"],
1087 registry
1088 )
1089 .unwrap();
1090
1091 let lru_runtime_loop_count = register_int_counter_with_registry!(
1092 "lru_runtime_loop_count",
1093 "The counts of the eviction loop in LRU manager per second",
1094 registry
1095 )
1096 .unwrap();
1097
1098 let lru_latest_sequence = register_int_gauge_with_registry!(
1099 "lru_latest_sequence",
1100 "Current LRU global sequence",
1101 registry,
1102 )
1103 .unwrap();
1104
1105 let lru_watermark_sequence = register_int_gauge_with_registry!(
1106 "lru_watermark_sequence",
1107 "Current LRU watermark sequence",
1108 registry,
1109 )
1110 .unwrap();
1111
1112 let lru_eviction_policy = register_int_gauge_with_registry!(
1113 "lru_eviction_policy",
1114 "Current LRU eviction policy",
1115 registry,
1116 )
1117 .unwrap();
1118
1119 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1120 "jemalloc_allocated_bytes",
1121 "The allocated memory jemalloc, got from jemalloc_ctl",
1122 registry
1123 )
1124 .unwrap();
1125
1126 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1127 "jemalloc_active_bytes",
1128 "The active memory jemalloc, got from jemalloc_ctl",
1129 registry
1130 )
1131 .unwrap();
1132
1133 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1134 "jemalloc_resident_bytes",
1135 "The active memory jemalloc, got from jemalloc_ctl",
1136 registry
1137 )
1138 .unwrap();
1139
1140 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1141 "jemalloc_metadata_bytes",
1142 "The active memory jemalloc, got from jemalloc_ctl",
1143 registry
1144 )
1145 .unwrap();
1146
1147 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1148 "jvm_allocated_bytes",
1149 "The allocated jvm memory",
1150 registry
1151 )
1152 .unwrap();
1153
1154 let jvm_active_bytes = register_int_gauge_with_registry!(
1155 "jvm_active_bytes",
1156 "The active jvm memory",
1157 registry
1158 )
1159 .unwrap();
1160
1161 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1162 "stream_materialize_cache_hit_count",
1163 "Materialize executor cache hit count",
1164 &["actor_id", "table_id", "fragment_id"],
1165 registry
1166 )
1167 .unwrap()
1168 .relabel_debug_1(level);
1169
1170 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1171 "stream_materialize_data_exist_count",
1172 "Materialize executor data exist count",
1173 &["actor_id", "table_id", "fragment_id"],
1174 registry
1175 )
1176 .unwrap()
1177 .relabel_debug_1(level);
1178
1179 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1180 "stream_materialize_cache_total_count",
1181 "Materialize executor cache total operation",
1182 &["actor_id", "table_id", "fragment_id"],
1183 registry
1184 )
1185 .unwrap()
1186 .relabel_debug_1(level);
1187
1188 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1189 "stream_memory_usage",
1190 "Memory usage for stream executors",
1191 &["actor_id", "table_id", "desc"],
1192 registry
1193 )
1194 .unwrap()
1195 .relabel_debug_1(level);
1196
1197 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1198 "gap_fill_generated_rows_count",
1199 "Total number of rows generated by gap fill executor",
1200 &["actor_id", "fragment_id"],
1201 registry
1202 )
1203 .unwrap()
1204 .relabel_debug_1(level);
1205
1206 let state_table_iter_count = register_guarded_int_counter_vec_with_registry!(
1207 "state_table_iter_count",
1208 "Total number of state table iter operations",
1209 &["actor_id", "fragment_id", "table_id"],
1210 registry
1211 )
1212 .unwrap()
1213 .relabel_debug_1(level);
1214
1215 let state_table_get_count = register_guarded_int_counter_vec_with_registry!(
1216 "state_table_get_count",
1217 "Total number of state table get operations",
1218 &["actor_id", "fragment_id", "table_id"],
1219 registry
1220 )
1221 .unwrap()
1222 .relabel_debug_1(level);
1223
1224 let state_table_iter_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1225 "state_table_iter_vnode_pruned_count",
1226 "Total number of state table iter operations pruned by vnode statistics",
1227 &["actor_id", "fragment_id", "table_id"],
1228 registry
1229 )
1230 .unwrap()
1231 .relabel_debug_1(level);
1232
1233 let state_table_get_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1234 "state_table_get_vnode_pruned_count",
1235 "Total number of state table get operations pruned by vnode statistics",
1236 &["actor_id", "fragment_id", "table_id"],
1237 registry
1238 )
1239 .unwrap()
1240 .relabel_debug_1(level);
1241
1242 Self {
1243 level,
1244 executor_row_count,
1245 mem_stream_node_output_row_count: stream_node_output_row_count,
1246 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1247 actor_scheduled_duration,
1248 actor_scheduled_cnt,
1249 actor_poll_duration,
1250 actor_poll_cnt,
1251 actor_idle_duration,
1252 actor_idle_cnt,
1253 actor_count,
1254 actor_in_record_cnt,
1255 actor_out_record_cnt,
1256 actor_current_epoch,
1257 source_output_row_count,
1258 source_split_change_count,
1259 source_backfill_row_count,
1260 sink_input_row_count,
1261 sink_input_bytes,
1262 sink_chunk_buffer_size,
1263 exchange_frag_recv_size,
1264 merge_barrier_align_duration,
1265 actor_output_buffer_blocking_duration_ns,
1266 actor_input_buffer_blocking_duration_ns,
1267 join_lookup_miss_count,
1268 join_lookup_total_count,
1269 join_insert_cache_miss_count,
1270 join_actor_input_waiting_duration_ns,
1271 join_match_duration_ns,
1272 join_cached_entry_count,
1273 join_matched_join_keys,
1274 barrier_align_duration,
1275 agg_lookup_miss_count,
1276 agg_total_lookup_count,
1277 agg_cached_entry_count,
1278 agg_chunk_lookup_miss_count,
1279 agg_chunk_total_lookup_count,
1280 agg_dirty_groups_count,
1281 agg_dirty_groups_heap_size,
1282 agg_distinct_cache_miss_count,
1283 agg_distinct_total_cache_count,
1284 agg_distinct_cached_entry_count,
1285 agg_state_cache_lookup_count,
1286 agg_state_cache_miss_count,
1287 group_top_n_cache_miss_count,
1288 group_top_n_total_query_cache_count,
1289 group_top_n_cached_entry_count,
1290 group_top_n_appendonly_cache_miss_count,
1291 group_top_n_appendonly_total_query_cache_count,
1292 group_top_n_appendonly_cached_entry_count,
1293 lookup_cache_miss_count,
1294 lookup_total_query_cache_count,
1295 lookup_cached_entry_count,
1296 temporal_join_cache_miss_count,
1297 temporal_join_total_query_cache_count,
1298 temporal_join_cached_entry_count,
1299 backfill_snapshot_read_row_count,
1300 backfill_upstream_output_row_count,
1301 cdc_backfill_snapshot_read_row_count,
1302 cdc_backfill_upstream_output_row_count,
1303 snapshot_backfill_consume_row_count,
1304 over_window_cached_entry_count,
1305 over_window_cache_lookup_count,
1306 over_window_cache_miss_count,
1307 over_window_range_cache_entry_count,
1308 over_window_range_cache_lookup_count,
1309 over_window_range_cache_left_miss_count,
1310 over_window_range_cache_right_miss_count,
1311 over_window_accessed_entry_count,
1312 over_window_compute_count,
1313 over_window_same_output_count,
1314 barrier_inflight_latency,
1315 barrier_sync_latency,
1316 barrier_batch_size,
1317 barrier_manager_progress,
1318 kv_log_store_storage_write_count,
1319 kv_log_store_storage_write_size,
1320 kv_log_store_rewind_count,
1321 kv_log_store_rewind_delay,
1322 kv_log_store_storage_read_count,
1323 kv_log_store_storage_read_size,
1324 kv_log_store_buffer_unconsumed_item_count,
1325 kv_log_store_buffer_unconsumed_row_count,
1326 kv_log_store_buffer_unconsumed_epoch_count,
1327 kv_log_store_buffer_unconsumed_min_epoch,
1328 sync_kv_log_store_read_count,
1329 sync_kv_log_store_read_size,
1330 sync_kv_log_store_write_pause_duration_ns,
1331 sync_kv_log_store_state,
1332 sync_kv_log_store_wait_next_poll_ns,
1333 sync_kv_log_store_storage_write_count,
1334 sync_kv_log_store_storage_write_size,
1335 sync_kv_log_store_buffer_unconsumed_item_count,
1336 sync_kv_log_store_buffer_unconsumed_row_count,
1337 sync_kv_log_store_buffer_unconsumed_epoch_count,
1338 sync_kv_log_store_buffer_unconsumed_min_epoch,
1339 lru_runtime_loop_count,
1340 lru_latest_sequence,
1341 lru_watermark_sequence,
1342 lru_eviction_policy,
1343 jemalloc_allocated_bytes,
1344 jemalloc_active_bytes,
1345 jemalloc_resident_bytes,
1346 jemalloc_metadata_bytes,
1347 jvm_allocated_bytes,
1348 jvm_active_bytes,
1349 stream_memory_usage,
1350 materialize_cache_hit_count,
1351 materialize_data_exist_count,
1352 materialize_cache_total_count,
1353 materialize_input_row_count,
1354 materialize_current_epoch,
1355 pg_cdc_state_table_lsn,
1356 pg_cdc_jni_commit_offset_lsn,
1357 mysql_cdc_state_binlog_file_seq,
1358 mysql_cdc_state_binlog_position,
1359 gap_fill_generated_rows_count,
1360 state_table_iter_count,
1361 state_table_get_count,
1362 state_table_iter_vnode_pruned_count,
1363 state_table_get_vnode_pruned_count,
1364 }
1365 }
1366
1367 pub fn unused() -> Self {
1369 global_streaming_metrics(MetricLevel::Disabled)
1370 }
1371
1372 pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1373 let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1374 let actor_scheduled_duration = self
1375 .actor_scheduled_duration
1376 .with_guarded_label_values(label_list);
1377 let actor_scheduled_cnt = self
1378 .actor_scheduled_cnt
1379 .with_guarded_label_values(label_list);
1380 let actor_poll_duration = self
1381 .actor_poll_duration
1382 .with_guarded_label_values(label_list);
1383 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1384 let actor_idle_duration = self
1385 .actor_idle_duration
1386 .with_guarded_label_values(label_list);
1387 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1388 ActorMetrics {
1389 actor_scheduled_duration,
1390 actor_scheduled_cnt,
1391 actor_poll_duration,
1392 actor_poll_cnt,
1393 actor_idle_duration,
1394 actor_idle_cnt,
1395 }
1396 }
1397
1398 pub(crate) fn new_actor_input_metrics(
1399 &self,
1400 actor_id: ActorId,
1401 fragment_id: FragmentId,
1402 upstream_fragment_id: FragmentId,
1403 ) -> ActorInputMetrics {
1404 let actor_id_str = actor_id.to_string();
1405 let fragment_id_str = fragment_id.to_string();
1406 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1407 ActorInputMetrics {
1408 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1409 &actor_id_str,
1410 &fragment_id_str,
1411 &upstream_fragment_id_str,
1412 ]),
1413 actor_input_buffer_blocking_duration_ns: self
1414 .actor_input_buffer_blocking_duration_ns
1415 .with_guarded_label_values(&[
1416 &actor_id_str,
1417 &fragment_id_str,
1418 &upstream_fragment_id_str,
1419 ]),
1420 }
1421 }
1422
1423 pub fn new_sink_exec_metrics(
1424 &self,
1425 id: SinkId,
1426 actor_id: ActorId,
1427 fragment_id: FragmentId,
1428 ) -> SinkExecutorMetrics {
1429 let label_list: &[&str; 3] = &[
1430 &id.to_string(),
1431 &actor_id.to_string(),
1432 &fragment_id.to_string(),
1433 ];
1434 SinkExecutorMetrics {
1435 sink_input_row_count: self
1436 .sink_input_row_count
1437 .with_guarded_label_values(label_list),
1438 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1439 sink_chunk_buffer_size: self
1440 .sink_chunk_buffer_size
1441 .with_guarded_label_values(label_list),
1442 }
1443 }
1444
1445 pub fn new_group_top_n_metrics(
1446 &self,
1447 table_id: TableId,
1448 actor_id: ActorId,
1449 fragment_id: FragmentId,
1450 ) -> GroupTopNMetrics {
1451 let label_list: &[&str; 3] = &[
1452 &table_id.to_string(),
1453 &actor_id.to_string(),
1454 &fragment_id.to_string(),
1455 ];
1456
1457 GroupTopNMetrics {
1458 group_top_n_cache_miss_count: self
1459 .group_top_n_cache_miss_count
1460 .with_guarded_label_values(label_list),
1461 group_top_n_total_query_cache_count: self
1462 .group_top_n_total_query_cache_count
1463 .with_guarded_label_values(label_list),
1464 group_top_n_cached_entry_count: self
1465 .group_top_n_cached_entry_count
1466 .with_guarded_label_values(label_list),
1467 }
1468 }
1469
1470 pub fn new_append_only_group_top_n_metrics(
1471 &self,
1472 table_id: TableId,
1473 actor_id: ActorId,
1474 fragment_id: FragmentId,
1475 ) -> GroupTopNMetrics {
1476 let label_list: &[&str; 3] = &[
1477 &table_id.to_string(),
1478 &actor_id.to_string(),
1479 &fragment_id.to_string(),
1480 ];
1481
1482 GroupTopNMetrics {
1483 group_top_n_cache_miss_count: self
1484 .group_top_n_appendonly_cache_miss_count
1485 .with_guarded_label_values(label_list),
1486 group_top_n_total_query_cache_count: self
1487 .group_top_n_appendonly_total_query_cache_count
1488 .with_guarded_label_values(label_list),
1489 group_top_n_cached_entry_count: self
1490 .group_top_n_appendonly_cached_entry_count
1491 .with_guarded_label_values(label_list),
1492 }
1493 }
1494
1495 pub fn new_lookup_executor_metrics(
1496 &self,
1497 table_id: TableId,
1498 actor_id: ActorId,
1499 fragment_id: FragmentId,
1500 ) -> LookupExecutorMetrics {
1501 let label_list: &[&str; 3] = &[
1502 &table_id.to_string(),
1503 &actor_id.to_string(),
1504 &fragment_id.to_string(),
1505 ];
1506
1507 LookupExecutorMetrics {
1508 lookup_cache_miss_count: self
1509 .lookup_cache_miss_count
1510 .with_guarded_label_values(label_list),
1511 lookup_total_query_cache_count: self
1512 .lookup_total_query_cache_count
1513 .with_guarded_label_values(label_list),
1514 lookup_cached_entry_count: self
1515 .lookup_cached_entry_count
1516 .with_guarded_label_values(label_list),
1517 }
1518 }
1519
1520 pub fn new_hash_agg_metrics(
1521 &self,
1522 table_id: TableId,
1523 actor_id: ActorId,
1524 fragment_id: FragmentId,
1525 ) -> HashAggMetrics {
1526 let label_list: &[&str; 3] = &[
1527 &table_id.to_string(),
1528 &actor_id.to_string(),
1529 &fragment_id.to_string(),
1530 ];
1531 HashAggMetrics {
1532 agg_lookup_miss_count: self
1533 .agg_lookup_miss_count
1534 .with_guarded_label_values(label_list),
1535 agg_total_lookup_count: self
1536 .agg_total_lookup_count
1537 .with_guarded_label_values(label_list),
1538 agg_cached_entry_count: self
1539 .agg_cached_entry_count
1540 .with_guarded_label_values(label_list),
1541 agg_chunk_lookup_miss_count: self
1542 .agg_chunk_lookup_miss_count
1543 .with_guarded_label_values(label_list),
1544 agg_chunk_total_lookup_count: self
1545 .agg_chunk_total_lookup_count
1546 .with_guarded_label_values(label_list),
1547 agg_dirty_groups_count: self
1548 .agg_dirty_groups_count
1549 .with_guarded_label_values(label_list),
1550 agg_dirty_groups_heap_size: self
1551 .agg_dirty_groups_heap_size
1552 .with_guarded_label_values(label_list),
1553 agg_state_cache_lookup_count: self
1554 .agg_state_cache_lookup_count
1555 .with_guarded_label_values(label_list),
1556 agg_state_cache_miss_count: self
1557 .agg_state_cache_miss_count
1558 .with_guarded_label_values(label_list),
1559 }
1560 }
1561
1562 pub fn new_agg_distinct_dedup_metrics(
1563 &self,
1564 table_id: TableId,
1565 actor_id: ActorId,
1566 fragment_id: FragmentId,
1567 ) -> AggDistinctDedupMetrics {
1568 let label_list: &[&str; 3] = &[
1569 &table_id.to_string(),
1570 &actor_id.to_string(),
1571 &fragment_id.to_string(),
1572 ];
1573 AggDistinctDedupMetrics {
1574 agg_distinct_cache_miss_count: self
1575 .agg_distinct_cache_miss_count
1576 .with_guarded_label_values(label_list),
1577 agg_distinct_total_cache_count: self
1578 .agg_distinct_total_cache_count
1579 .with_guarded_label_values(label_list),
1580 agg_distinct_cached_entry_count: self
1581 .agg_distinct_cached_entry_count
1582 .with_guarded_label_values(label_list),
1583 }
1584 }
1585
1586 pub fn new_temporal_join_metrics(
1587 &self,
1588 table_id: TableId,
1589 actor_id: ActorId,
1590 fragment_id: FragmentId,
1591 ) -> TemporalJoinMetrics {
1592 let label_list: &[&str; 3] = &[
1593 &table_id.to_string(),
1594 &actor_id.to_string(),
1595 &fragment_id.to_string(),
1596 ];
1597 TemporalJoinMetrics {
1598 temporal_join_cache_miss_count: self
1599 .temporal_join_cache_miss_count
1600 .with_guarded_label_values(label_list),
1601 temporal_join_total_query_cache_count: self
1602 .temporal_join_total_query_cache_count
1603 .with_guarded_label_values(label_list),
1604 temporal_join_cached_entry_count: self
1605 .temporal_join_cached_entry_count
1606 .with_guarded_label_values(label_list),
1607 }
1608 }
1609
1610 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1611 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1612 BackfillMetrics {
1613 backfill_snapshot_read_row_count: self
1614 .backfill_snapshot_read_row_count
1615 .with_guarded_label_values(label_list),
1616 backfill_upstream_output_row_count: self
1617 .backfill_upstream_output_row_count
1618 .with_guarded_label_values(label_list),
1619 }
1620 }
1621
1622 pub fn new_cdc_backfill_metrics(
1623 &self,
1624 table_id: TableId,
1625 actor_id: ActorId,
1626 ) -> CdcBackfillMetrics {
1627 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1628 CdcBackfillMetrics {
1629 cdc_backfill_snapshot_read_row_count: self
1630 .cdc_backfill_snapshot_read_row_count
1631 .with_guarded_label_values(label_list),
1632 cdc_backfill_upstream_output_row_count: self
1633 .cdc_backfill_upstream_output_row_count
1634 .with_guarded_label_values(label_list),
1635 }
1636 }
1637
1638 pub fn new_over_window_metrics(
1639 &self,
1640 table_id: TableId,
1641 actor_id: ActorId,
1642 fragment_id: FragmentId,
1643 ) -> OverWindowMetrics {
1644 let label_list: &[&str; 3] = &[
1645 &table_id.to_string(),
1646 &actor_id.to_string(),
1647 &fragment_id.to_string(),
1648 ];
1649 OverWindowMetrics {
1650 over_window_cached_entry_count: self
1651 .over_window_cached_entry_count
1652 .with_guarded_label_values(label_list),
1653 over_window_cache_lookup_count: self
1654 .over_window_cache_lookup_count
1655 .with_guarded_label_values(label_list),
1656 over_window_cache_miss_count: self
1657 .over_window_cache_miss_count
1658 .with_guarded_label_values(label_list),
1659 over_window_range_cache_entry_count: self
1660 .over_window_range_cache_entry_count
1661 .with_guarded_label_values(label_list),
1662 over_window_range_cache_lookup_count: self
1663 .over_window_range_cache_lookup_count
1664 .with_guarded_label_values(label_list),
1665 over_window_range_cache_left_miss_count: self
1666 .over_window_range_cache_left_miss_count
1667 .with_guarded_label_values(label_list),
1668 over_window_range_cache_right_miss_count: self
1669 .over_window_range_cache_right_miss_count
1670 .with_guarded_label_values(label_list),
1671 over_window_accessed_entry_count: self
1672 .over_window_accessed_entry_count
1673 .with_guarded_label_values(label_list),
1674 over_window_compute_count: self
1675 .over_window_compute_count
1676 .with_guarded_label_values(label_list),
1677 over_window_same_output_count: self
1678 .over_window_same_output_count
1679 .with_guarded_label_values(label_list),
1680 }
1681 }
1682
1683 pub fn new_materialize_cache_metrics(
1684 &self,
1685 table_id: TableId,
1686 actor_id: ActorId,
1687 fragment_id: FragmentId,
1688 ) -> MaterializeCacheMetrics {
1689 let label_list: &[&str; 3] = &[
1690 &actor_id.to_string(),
1691 &table_id.to_string(),
1692 &fragment_id.to_string(),
1693 ];
1694 MaterializeCacheMetrics {
1695 materialize_cache_hit_count: self
1696 .materialize_cache_hit_count
1697 .with_guarded_label_values(label_list),
1698 materialize_data_exist_count: self
1699 .materialize_data_exist_count
1700 .with_guarded_label_values(label_list),
1701 materialize_cache_total_count: self
1702 .materialize_cache_total_count
1703 .with_guarded_label_values(label_list),
1704 }
1705 }
1706
1707 pub fn new_materialize_metrics(
1708 &self,
1709 table_id: TableId,
1710 actor_id: ActorId,
1711 fragment_id: FragmentId,
1712 ) -> MaterializeMetrics {
1713 let label_list: &[&str; 3] = &[
1714 &actor_id.to_string(),
1715 &table_id.to_string(),
1716 &fragment_id.to_string(),
1717 ];
1718 MaterializeMetrics {
1719 materialize_input_row_count: self
1720 .materialize_input_row_count
1721 .with_guarded_label_values(label_list),
1722 materialize_current_epoch: self
1723 .materialize_current_epoch
1724 .with_guarded_label_values(label_list),
1725 }
1726 }
1727
1728 pub fn new_state_table_metrics(
1729 &self,
1730 table_id: TableId,
1731 actor_id: ActorId,
1732 fragment_id: FragmentId,
1733 ) -> StateTableMetrics {
1734 let label_list: &[&str; 3] = &[
1735 &actor_id.to_string(),
1736 &fragment_id.to_string(),
1737 &table_id.to_string(),
1738 ];
1739 StateTableMetrics {
1740 iter_count: self
1741 .state_table_iter_count
1742 .with_guarded_label_values(label_list),
1743 get_count: self
1744 .state_table_get_count
1745 .with_guarded_label_values(label_list),
1746 iter_vnode_pruned_count: self
1747 .state_table_iter_vnode_pruned_count
1748 .with_guarded_label_values(label_list),
1749 get_vnode_pruned_count: self
1750 .state_table_get_vnode_pruned_count
1751 .with_guarded_label_values(label_list),
1752 }
1753 }
1754}
1755
1756pub(crate) struct ActorInputMetrics {
1757 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1758 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1759}
1760
1761pub struct ActorMetrics {
1763 pub actor_scheduled_duration: LabelGuardedIntCounter,
1764 pub actor_scheduled_cnt: LabelGuardedIntCounter,
1765 pub actor_poll_duration: LabelGuardedIntCounter,
1766 pub actor_poll_cnt: LabelGuardedIntCounter,
1767 pub actor_idle_duration: LabelGuardedIntCounter,
1768 pub actor_idle_cnt: LabelGuardedIntCounter,
1769}
1770
1771pub struct SinkExecutorMetrics {
1772 pub sink_input_row_count: LabelGuardedIntCounter,
1773 pub sink_input_bytes: LabelGuardedIntCounter,
1774 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1775}
1776
1777pub struct MaterializeCacheMetrics {
1778 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1779 pub materialize_data_exist_count: LabelGuardedIntCounter,
1780 pub materialize_cache_total_count: LabelGuardedIntCounter,
1781}
1782
1783pub struct MaterializeMetrics {
1784 pub materialize_input_row_count: LabelGuardedIntCounter,
1785 pub materialize_current_epoch: LabelGuardedIntGauge,
1786}
1787
1788pub struct GroupTopNMetrics {
1789 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1790 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1791 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1792}
1793
1794pub struct LookupExecutorMetrics {
1795 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1796 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1797 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1798}
1799
1800pub struct HashAggMetrics {
1801 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1802 pub agg_total_lookup_count: LabelGuardedIntCounter,
1803 pub agg_cached_entry_count: LabelGuardedIntGauge,
1804 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1805 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1806 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1807 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1808 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1809 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1810}
1811
1812pub struct AggDistinctDedupMetrics {
1813 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1814 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1815 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1816}
1817
1818pub struct TemporalJoinMetrics {
1819 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1820 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1821 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1822}
1823
1824pub struct BackfillMetrics {
1825 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1826 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1827}
1828
1829#[derive(Clone)]
1830pub struct CdcBackfillMetrics {
1831 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1832 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1833}
1834
1835pub struct OverWindowMetrics {
1836 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1837 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1838 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1839 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1840 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1841 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1842 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1843 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1844 pub over_window_compute_count: LabelGuardedIntCounter,
1845 pub over_window_same_output_count: LabelGuardedIntCounter,
1846}
1847
1848pub struct StateTableMetrics {
1849 pub iter_count: LabelGuardedIntCounter,
1850 pub get_count: LabelGuardedIntCounter,
1851 pub iter_vnode_pruned_count: LabelGuardedIntCounter,
1852 pub get_vnode_pruned_count: LabelGuardedIntCounter,
1853}