1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33 register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45 pub level: MetricLevel,
46
47 pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50 pub mem_stream_node_output_row_count: CountMap,
54 pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56 actor_execution_time: LabelGuardedGaugeVec,
58 actor_scheduled_duration: LabelGuardedGaugeVec,
59 actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60 actor_fast_poll_duration: LabelGuardedGaugeVec,
61 actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62 actor_slow_poll_duration: LabelGuardedGaugeVec,
63 actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64 actor_poll_duration: LabelGuardedGaugeVec,
65 actor_poll_cnt: LabelGuardedIntGaugeVec,
66 actor_idle_duration: LabelGuardedGaugeVec,
67 actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69 pub actor_count: LabelGuardedIntGaugeVec,
71 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75 pub source_output_row_count: LabelGuardedIntCounterVec,
77 pub source_split_change_count: LabelGuardedIntCounterVec,
78 pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80 sink_input_row_count: LabelGuardedIntCounterVec,
82 sink_input_bytes: LabelGuardedIntCounterVec,
83 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88 pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94 actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
95
96 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98 pub join_lookup_total_count: LabelGuardedIntCounterVec,
99 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101 pub join_match_duration_ns: LabelGuardedIntCounterVec,
102 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108 agg_lookup_miss_count: LabelGuardedIntCounterVec,
110 agg_total_lookup_count: LabelGuardedIntCounterVec,
111 agg_cached_entry_count: LabelGuardedIntGaugeVec,
112 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131 lookup_cache_miss_count: LabelGuardedIntCounterVec,
133 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155 over_window_cache_miss_count: LabelGuardedIntCounterVec,
156 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161 over_window_compute_count: LabelGuardedIntCounterVec,
162 over_window_same_output_count: LabelGuardedIntCounterVec,
163
164 pub barrier_inflight_latency: Histogram,
168 pub barrier_sync_latency: Histogram,
170 pub barrier_batch_size: Histogram,
171 pub barrier_manager_progress: IntCounter,
173
174 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197 pub lru_runtime_loop_count: IntCounter,
199 pub lru_latest_sequence: IntGauge,
200 pub lru_watermark_sequence: IntGauge,
201 pub lru_eviction_policy: IntGauge,
202 pub jemalloc_allocated_bytes: IntGauge,
203 pub jemalloc_active_bytes: IntGauge,
204 pub jemalloc_resident_bytes: IntGauge,
205 pub jemalloc_metadata_bytes: IntGauge,
206 pub jvm_allocated_bytes: IntGauge,
207 pub jvm_active_bytes: IntGauge,
208 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214 materialize_input_row_count: RelabeledGuardedIntCounterVec,
215 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221 pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
223 pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
224
225 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
227}
228
229pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
230
231pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
232 GLOBAL_STREAMING_METRICS
233 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
234 .clone()
235}
236
237impl StreamingMetrics {
238 fn new(registry: &Registry, level: MetricLevel) -> Self {
239 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
240 "stream_executor_row_count",
241 "Total number of rows that have been output from each executor",
242 &["actor_id", "fragment_id", "executor_identity"],
243 registry
244 )
245 .unwrap()
246 .relabel_debug_1(level);
247
248 let stream_node_output_row_count = CountMap::new();
249 let stream_node_output_blocking_duration_ns = CountMap::new();
250
251 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
252 "stream_source_output_rows_counts",
253 "Total number of rows that have been output from source",
254 &["source_id", "source_name", "actor_id", "fragment_id"],
255 registry
256 )
257 .unwrap();
258
259 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
260 "stream_source_split_change_event_count",
261 "Total number of split change events that have been operated by source",
262 &["source_id", "source_name", "actor_id", "fragment_id"],
263 registry
264 )
265 .unwrap();
266
267 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
268 "stream_source_backfill_rows_counts",
269 "Total number of rows that have been backfilled for source",
270 &["source_id", "source_name", "actor_id", "fragment_id"],
271 registry
272 )
273 .unwrap();
274
275 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
276 "stream_sink_input_row_count",
277 "Total number of rows streamed into sink executors",
278 &["sink_id", "actor_id", "fragment_id"],
279 registry
280 )
281 .unwrap();
282
283 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
284 "stream_sink_input_bytes",
285 "Total size of chunks streamed into sink executors",
286 &["sink_id", "actor_id", "fragment_id"],
287 registry
288 )
289 .unwrap();
290
291 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
292 "stream_mview_input_row_count",
293 "Total number of rows streamed into materialize executors",
294 &["actor_id", "table_id", "fragment_id"],
295 registry
296 )
297 .unwrap()
298 .relabel_debug_1(level);
299
300 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
301 "stream_mview_current_epoch",
302 "The current epoch of the materialized executor",
303 &["actor_id", "table_id", "fragment_id"],
304 registry
305 )
306 .unwrap()
307 .relabel_debug_1(level);
308
309 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
310 "stream_pg_cdc_state_table_lsn",
311 "Current LSN value stored in PostgreSQL CDC state table",
312 &["source_id"],
313 registry,
314 )
315 .unwrap();
316
317 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
318 "stream_pg_cdc_jni_commit_offset_lsn",
319 "LSN value when JNI commit offset is called for PostgreSQL CDC",
320 &["source_id"],
321 registry,
322 )
323 .unwrap();
324
325 let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
326 "stream_mysql_cdc_state_binlog_file_seq",
327 "Current binlog file sequence number stored in MySQL CDC state table",
328 &["source_id"],
329 registry,
330 )
331 .unwrap();
332
333 let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
334 "stream_mysql_cdc_state_binlog_position",
335 "Current binlog position stored in MySQL CDC state table",
336 &["source_id"],
337 registry,
338 )
339 .unwrap();
340
341 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
342 "stream_sink_chunk_buffer_size",
343 "Total size of chunks buffered in a barrier",
344 &["sink_id", "actor_id", "fragment_id"],
345 registry
346 )
347 .unwrap();
348
349 let actor_execution_time = register_guarded_gauge_vec_with_registry!(
350 "stream_actor_actor_execution_time",
351 "Total execution time (s) of an actor",
352 &["actor_id"],
353 registry
354 )
355 .unwrap();
356
357 let actor_output_buffer_blocking_duration_ns =
358 register_guarded_int_counter_vec_with_registry!(
359 "stream_actor_output_buffer_blocking_duration_ns",
360 "Total blocking duration (ns) of output buffer",
361 &["actor_id", "fragment_id", "downstream_fragment_id"],
362 registry
363 )
364 .unwrap()
365 .relabel_debug_1(level);
367
368 let actor_input_buffer_blocking_duration_ns =
369 register_guarded_int_counter_vec_with_registry!(
370 "stream_actor_input_buffer_blocking_duration_ns",
371 "Total blocking duration (ns) of input buffer",
372 &["actor_id", "fragment_id", "upstream_fragment_id"],
373 registry
374 )
375 .unwrap()
376 .relabel_debug_1(level);
378
379 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
380 "stream_exchange_frag_recv_size",
381 "Total size of messages that have been received from upstream Fragment",
382 &["up_fragment_id", "down_fragment_id"],
383 registry
384 )
385 .unwrap();
386
387 let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
388 "stream_actor_fast_poll_duration",
389 "tokio's metrics",
390 &["actor_id"],
391 registry
392 )
393 .unwrap();
394
395 let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
396 "stream_actor_fast_poll_cnt",
397 "tokio's metrics",
398 &["actor_id"],
399 registry
400 )
401 .unwrap();
402
403 let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
404 "stream_actor_slow_poll_duration",
405 "tokio's metrics",
406 &["actor_id"],
407 registry
408 )
409 .unwrap();
410
411 let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
412 "stream_actor_slow_poll_cnt",
413 "tokio's metrics",
414 &["actor_id"],
415 registry
416 )
417 .unwrap();
418
419 let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
420 "stream_actor_poll_duration",
421 "tokio's metrics",
422 &["actor_id"],
423 registry
424 )
425 .unwrap();
426
427 let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
428 "stream_actor_poll_cnt",
429 "tokio's metrics",
430 &["actor_id"],
431 registry
432 )
433 .unwrap();
434
435 let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
436 "stream_actor_scheduled_duration",
437 "tokio's metrics",
438 &["actor_id"],
439 registry
440 )
441 .unwrap();
442
443 let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
444 "stream_actor_scheduled_cnt",
445 "tokio's metrics",
446 &["actor_id"],
447 registry
448 )
449 .unwrap();
450
451 let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
452 "stream_actor_idle_duration",
453 "tokio's metrics",
454 &["actor_id"],
455 registry
456 )
457 .unwrap();
458
459 let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
460 "stream_actor_idle_cnt",
461 "tokio's metrics",
462 &["actor_id"],
463 registry
464 )
465 .unwrap();
466
467 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
468 "stream_actor_in_record_cnt",
469 "Total number of rows actor received",
470 &["actor_id", "fragment_id", "upstream_fragment_id"],
471 registry
472 )
473 .unwrap()
474 .relabel_debug_1(level);
475
476 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
477 "stream_actor_out_record_cnt",
478 "Total number of rows actor sent",
479 &["actor_id", "fragment_id"],
480 registry
481 )
482 .unwrap()
483 .relabel_debug_1(level);
484
485 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
486 "stream_actor_current_epoch",
487 "Current epoch of actor",
488 &["actor_id", "fragment_id"],
489 registry
490 )
491 .unwrap()
492 .relabel_debug_1(level);
493
494 let actor_count = register_guarded_int_gauge_vec_with_registry!(
495 "stream_actor_count",
496 "Total number of actors (parallelism)",
497 &["fragment_id"],
498 registry
499 )
500 .unwrap();
501
502 let opts = histogram_opts!(
503 "stream_merge_barrier_align_duration",
504 "Duration of merge align barrier",
505 exponential_buckets(0.0001, 2.0, 21).unwrap() );
507 let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
508 opts,
509 &["actor_id", "fragment_id"],
510 registry
511 )
512 .unwrap()
513 .relabel_debug_1(level);
514
515 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
516 "stream_join_lookup_miss_count",
517 "Join executor lookup miss duration",
518 &["side", "join_table_id", "actor_id", "fragment_id"],
519 registry
520 )
521 .unwrap();
522
523 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
524 "stream_join_lookup_total_count",
525 "Join executor lookup total operation",
526 &["side", "join_table_id", "actor_id", "fragment_id"],
527 registry
528 )
529 .unwrap();
530
531 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
532 "stream_join_insert_cache_miss_count",
533 "Join executor cache miss when insert operation",
534 &["side", "join_table_id", "actor_id", "fragment_id"],
535 registry
536 )
537 .unwrap();
538
539 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
540 "stream_join_actor_input_waiting_duration_ns",
541 "Total waiting duration (ns) of input buffer of join actor",
542 &["actor_id", "fragment_id"],
543 registry
544 )
545 .unwrap();
546
547 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
548 "stream_join_match_duration_ns",
549 "Matching duration for each side",
550 &["actor_id", "fragment_id", "side"],
551 registry
552 )
553 .unwrap();
554
555 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
556 "stream_barrier_align_duration_ns",
557 "Duration of join align barrier",
558 &["actor_id", "fragment_id", "wait_side", "executor"],
559 registry
560 )
561 .unwrap()
562 .relabel_debug_1(level);
563
564 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
565 "stream_join_cached_entry_count",
566 "Number of cached entries in streaming join operators",
567 &["actor_id", "fragment_id", "side"],
568 registry
569 )
570 .unwrap();
571
572 let join_matched_join_keys_opts = histogram_opts!(
573 "stream_join_matched_join_keys",
574 "The number of keys matched in the opposite side",
575 exponential_buckets(16.0, 2.0, 28).unwrap() );
577
578 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
579 join_matched_join_keys_opts,
580 &["actor_id", "fragment_id", "table_id"],
581 registry
582 )
583 .unwrap()
584 .relabel_debug_1(level);
585
586 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
587 "stream_agg_lookup_miss_count",
588 "Aggregation executor lookup miss duration",
589 &["table_id", "actor_id", "fragment_id"],
590 registry
591 )
592 .unwrap();
593
594 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
595 "stream_agg_lookup_total_count",
596 "Aggregation executor lookup total operation",
597 &["table_id", "actor_id", "fragment_id"],
598 registry
599 )
600 .unwrap();
601
602 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
603 "stream_agg_distinct_cache_miss_count",
604 "Aggregation executor dinsinct miss duration",
605 &["table_id", "actor_id", "fragment_id"],
606 registry
607 )
608 .unwrap();
609
610 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
611 "stream_agg_distinct_total_cache_count",
612 "Aggregation executor distinct total operation",
613 &["table_id", "actor_id", "fragment_id"],
614 registry
615 )
616 .unwrap();
617
618 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
619 "stream_agg_distinct_cached_entry_count",
620 "Total entry counts in distinct aggregation executor cache",
621 &["table_id", "actor_id", "fragment_id"],
622 registry
623 )
624 .unwrap();
625
626 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
627 "stream_agg_dirty_groups_count",
628 "Total dirty group counts in aggregation executor",
629 &["table_id", "actor_id", "fragment_id"],
630 registry
631 )
632 .unwrap();
633
634 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
635 "stream_agg_dirty_groups_heap_size",
636 "Total dirty group heap size in aggregation executor",
637 &["table_id", "actor_id", "fragment_id"],
638 registry
639 )
640 .unwrap();
641
642 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
643 "stream_agg_state_cache_lookup_count",
644 "Aggregation executor state cache lookup count",
645 &["table_id", "actor_id", "fragment_id"],
646 registry
647 )
648 .unwrap();
649
650 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
651 "stream_agg_state_cache_miss_count",
652 "Aggregation executor state cache miss count",
653 &["table_id", "actor_id", "fragment_id"],
654 registry
655 )
656 .unwrap();
657
658 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
659 "stream_group_top_n_cache_miss_count",
660 "Group top n executor cache miss count",
661 &["table_id", "actor_id", "fragment_id"],
662 registry
663 )
664 .unwrap();
665
666 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
667 "stream_group_top_n_total_query_cache_count",
668 "Group top n executor query cache total count",
669 &["table_id", "actor_id", "fragment_id"],
670 registry
671 )
672 .unwrap();
673
674 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
675 "stream_group_top_n_cached_entry_count",
676 "Total entry counts in group top n executor cache",
677 &["table_id", "actor_id", "fragment_id"],
678 registry
679 )
680 .unwrap();
681
682 let group_top_n_appendonly_cache_miss_count =
683 register_guarded_int_counter_vec_with_registry!(
684 "stream_group_top_n_appendonly_cache_miss_count",
685 "Group top n appendonly executor cache miss count",
686 &["table_id", "actor_id", "fragment_id"],
687 registry
688 )
689 .unwrap();
690
691 let group_top_n_appendonly_total_query_cache_count =
692 register_guarded_int_counter_vec_with_registry!(
693 "stream_group_top_n_appendonly_total_query_cache_count",
694 "Group top n appendonly executor total cache count",
695 &["table_id", "actor_id", "fragment_id"],
696 registry
697 )
698 .unwrap();
699
700 let group_top_n_appendonly_cached_entry_count =
701 register_guarded_int_gauge_vec_with_registry!(
702 "stream_group_top_n_appendonly_cached_entry_count",
703 "Total entry counts in group top n appendonly executor cache",
704 &["table_id", "actor_id", "fragment_id"],
705 registry
706 )
707 .unwrap();
708
709 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
710 "stream_lookup_cache_miss_count",
711 "Lookup executor cache miss count",
712 &["table_id", "actor_id", "fragment_id"],
713 registry
714 )
715 .unwrap();
716
717 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
718 "stream_lookup_total_query_cache_count",
719 "Lookup executor query cache total count",
720 &["table_id", "actor_id", "fragment_id"],
721 registry
722 )
723 .unwrap();
724
725 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
726 "stream_lookup_cached_entry_count",
727 "Total entry counts in lookup executor cache",
728 &["table_id", "actor_id", "fragment_id"],
729 registry
730 )
731 .unwrap();
732
733 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
734 "stream_temporal_join_cache_miss_count",
735 "Temporal join executor cache miss count",
736 &["table_id", "actor_id", "fragment_id"],
737 registry
738 )
739 .unwrap();
740
741 let temporal_join_total_query_cache_count =
742 register_guarded_int_counter_vec_with_registry!(
743 "stream_temporal_join_total_query_cache_count",
744 "Temporal join executor query cache total count",
745 &["table_id", "actor_id", "fragment_id"],
746 registry
747 )
748 .unwrap();
749
750 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
751 "stream_temporal_join_cached_entry_count",
752 "Total entry count in temporal join executor cache",
753 &["table_id", "actor_id", "fragment_id"],
754 registry
755 )
756 .unwrap();
757
758 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
759 "stream_agg_cached_entry_count",
760 "Number of cached keys in streaming aggregation operators",
761 &["table_id", "actor_id", "fragment_id"],
762 registry
763 )
764 .unwrap();
765
766 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
767 "stream_agg_chunk_lookup_miss_count",
768 "Aggregation executor chunk-level lookup miss duration",
769 &["table_id", "actor_id", "fragment_id"],
770 registry
771 )
772 .unwrap();
773
774 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
775 "stream_agg_chunk_lookup_total_count",
776 "Aggregation executor chunk-level lookup total operation",
777 &["table_id", "actor_id", "fragment_id"],
778 registry
779 )
780 .unwrap();
781
782 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
783 "stream_backfill_snapshot_read_row_count",
784 "Total number of rows that have been read from the backfill snapshot",
785 &["table_id", "actor_id"],
786 registry
787 )
788 .unwrap();
789
790 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
791 "stream_backfill_upstream_output_row_count",
792 "Total number of rows that have been output from the backfill upstream",
793 &["table_id", "actor_id"],
794 registry
795 )
796 .unwrap();
797
798 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
799 "stream_cdc_backfill_snapshot_read_row_count",
800 "Total number of rows that have been read from the cdc_backfill snapshot",
801 &["table_id", "actor_id"],
802 registry
803 )
804 .unwrap();
805
806 let cdc_backfill_upstream_output_row_count =
807 register_guarded_int_counter_vec_with_registry!(
808 "stream_cdc_backfill_upstream_output_row_count",
809 "Total number of rows that have been output from the cdc_backfill upstream",
810 &["table_id", "actor_id"],
811 registry
812 )
813 .unwrap();
814
815 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
816 "stream_snapshot_backfill_consume_snapshot_row_count",
817 "Total number of rows that have been output from snapshot backfill",
818 &["table_id", "actor_id", "stage"],
819 registry
820 )
821 .unwrap();
822
823 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
824 "stream_over_window_cached_entry_count",
825 "Total entry (partition) count in over window executor cache",
826 &["table_id", "actor_id", "fragment_id"],
827 registry
828 )
829 .unwrap();
830
831 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
832 "stream_over_window_cache_lookup_count",
833 "Over window executor cache lookup count",
834 &["table_id", "actor_id", "fragment_id"],
835 registry
836 )
837 .unwrap();
838
839 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
840 "stream_over_window_cache_miss_count",
841 "Over window executor cache miss count",
842 &["table_id", "actor_id", "fragment_id"],
843 registry
844 )
845 .unwrap();
846
847 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
848 "stream_over_window_range_cache_entry_count",
849 "Over window partition range cache entry count",
850 &["table_id", "actor_id", "fragment_id"],
851 registry,
852 )
853 .unwrap();
854
855 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
856 "stream_over_window_range_cache_lookup_count",
857 "Over window partition range cache lookup count",
858 &["table_id", "actor_id", "fragment_id"],
859 registry
860 )
861 .unwrap();
862
863 let over_window_range_cache_left_miss_count =
864 register_guarded_int_counter_vec_with_registry!(
865 "stream_over_window_range_cache_left_miss_count",
866 "Over window partition range cache left miss count",
867 &["table_id", "actor_id", "fragment_id"],
868 registry
869 )
870 .unwrap();
871
872 let over_window_range_cache_right_miss_count =
873 register_guarded_int_counter_vec_with_registry!(
874 "stream_over_window_range_cache_right_miss_count",
875 "Over window partition range cache right miss count",
876 &["table_id", "actor_id", "fragment_id"],
877 registry
878 )
879 .unwrap();
880
881 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
882 "stream_over_window_accessed_entry_count",
883 "Over window accessed entry count",
884 &["table_id", "actor_id", "fragment_id"],
885 registry
886 )
887 .unwrap();
888
889 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
890 "stream_over_window_compute_count",
891 "Over window compute count",
892 &["table_id", "actor_id", "fragment_id"],
893 registry
894 )
895 .unwrap();
896
897 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
898 "stream_over_window_same_output_count",
899 "Over window same output count",
900 &["table_id", "actor_id", "fragment_id"],
901 registry
902 )
903 .unwrap();
904
905 let opts = histogram_opts!(
906 "stream_barrier_inflight_duration_seconds",
907 "barrier_inflight_latency",
908 exponential_buckets(0.1, 1.5, 16).unwrap() );
910 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
911
912 let opts = histogram_opts!(
913 "stream_barrier_sync_storage_duration_seconds",
914 "barrier_sync_latency",
915 exponential_buckets(0.1, 1.5, 16).unwrap() );
917 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
918
919 let opts = histogram_opts!(
920 "stream_barrier_batch_size",
921 "barrier_batch_size",
922 exponential_buckets(1.0, 2.0, 8).unwrap()
923 );
924 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
925
926 let barrier_manager_progress = register_int_counter_with_registry!(
927 "stream_barrier_manager_progress",
928 "The number of actors that have processed the earliest in-flight barriers",
929 registry
930 )
931 .unwrap();
932
933 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
934 "sync_kv_log_store_wait_next_poll_ns",
935 "Total duration (ns) of waiting for next poll",
936 &["actor_id", "target", "fragment_id", "relation"],
937 registry
938 )
939 .unwrap();
940
941 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
942 "sync_kv_log_store_read_count",
943 "read row count throughput of sync_kv log store",
944 &["type", "actor_id", "target", "fragment_id", "relation"],
945 registry
946 )
947 .unwrap();
948
949 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
950 "sync_kv_log_store_read_size",
951 "read size throughput of sync_kv log store",
952 &["type", "actor_id", "target", "fragment_id", "relation"],
953 registry
954 )
955 .unwrap();
956
957 let sync_kv_log_store_write_pause_duration_ns =
958 register_guarded_int_counter_vec_with_registry!(
959 "sync_kv_log_store_write_pause_duration_ns",
960 "Duration (ns) of sync_kv log store write pause",
961 &["actor_id", "target", "fragment_id", "relation"],
962 registry
963 )
964 .unwrap();
965
966 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
967 "sync_kv_log_store_state",
968 "clean/unclean state transition for sync_kv log store",
969 &["state", "actor_id", "target", "fragment_id", "relation"],
970 registry
971 )
972 .unwrap();
973
974 let sync_kv_log_store_storage_write_count =
975 register_guarded_int_counter_vec_with_registry!(
976 "sync_kv_log_store_storage_write_count",
977 "Write row count throughput of sync_kv log store",
978 &["actor_id", "target", "fragment_id", "relation"],
979 registry
980 )
981 .unwrap();
982
983 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
984 "sync_kv_log_store_storage_write_size",
985 "Write size throughput of sync_kv log store",
986 &["actor_id", "target", "fragment_id", "relation"],
987 registry
988 )
989 .unwrap();
990
991 let sync_kv_log_store_buffer_unconsumed_item_count =
992 register_guarded_int_gauge_vec_with_registry!(
993 "sync_kv_log_store_buffer_unconsumed_item_count",
994 "Number of Unconsumed Item in buffer",
995 &["actor_id", "target", "fragment_id", "relation"],
996 registry
997 )
998 .unwrap();
999
1000 let sync_kv_log_store_buffer_unconsumed_row_count =
1001 register_guarded_int_gauge_vec_with_registry!(
1002 "sync_kv_log_store_buffer_unconsumed_row_count",
1003 "Number of Unconsumed Row in buffer",
1004 &["actor_id", "target", "fragment_id", "relation"],
1005 registry
1006 )
1007 .unwrap();
1008
1009 let sync_kv_log_store_buffer_unconsumed_epoch_count =
1010 register_guarded_int_gauge_vec_with_registry!(
1011 "sync_kv_log_store_buffer_unconsumed_epoch_count",
1012 "Number of Unconsumed Epoch in buffer",
1013 &["actor_id", "target", "fragment_id", "relation"],
1014 registry
1015 )
1016 .unwrap();
1017
1018 let sync_kv_log_store_buffer_unconsumed_min_epoch =
1019 register_guarded_int_gauge_vec_with_registry!(
1020 "sync_kv_log_store_buffer_unconsumed_min_epoch",
1021 "Number of Unconsumed Epoch in buffer",
1022 &["actor_id", "target", "fragment_id", "relation"],
1023 registry
1024 )
1025 .unwrap();
1026
1027 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1028 "kv_log_store_storage_write_count",
1029 "Write row count throughput of kv log store",
1030 &["actor_id", "connector", "sink_id", "sink_name"],
1031 registry
1032 )
1033 .unwrap();
1034
1035 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1036 "kv_log_store_storage_write_size",
1037 "Write size throughput of kv log store",
1038 &["actor_id", "connector", "sink_id", "sink_name"],
1039 registry
1040 )
1041 .unwrap();
1042
1043 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1044 "kv_log_store_storage_read_count",
1045 "Write row count throughput of kv log store",
1046 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1047 registry
1048 )
1049 .unwrap();
1050
1051 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1052 "kv_log_store_storage_read_size",
1053 "Write size throughput of kv log store",
1054 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1055 registry
1056 )
1057 .unwrap();
1058
1059 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1060 "kv_log_store_rewind_count",
1061 "Kv log store rewind rate",
1062 &["actor_id", "connector", "sink_id", "sink_name"],
1063 registry
1064 )
1065 .unwrap();
1066
1067 let kv_log_store_rewind_delay_opts = {
1068 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1069 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1070 - REWIND_BASE_DELAY.as_secs_f64().log2())
1071 .ceil() as usize;
1072 let buckets = exponential_buckets(
1073 REWIND_BASE_DELAY.as_secs_f64(),
1074 REWIND_BACKOFF_FACTOR as _,
1075 bucket_count,
1076 )
1077 .unwrap();
1078 histogram_opts!(
1079 "kv_log_store_rewind_delay",
1080 "Kv log store rewind delay",
1081 buckets,
1082 )
1083 };
1084
1085 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1086 kv_log_store_rewind_delay_opts,
1087 &["actor_id", "connector", "sink_id", "sink_name"],
1088 registry
1089 )
1090 .unwrap();
1091
1092 let kv_log_store_buffer_unconsumed_item_count =
1093 register_guarded_int_gauge_vec_with_registry!(
1094 "kv_log_store_buffer_unconsumed_item_count",
1095 "Number of Unconsumed Item in buffer",
1096 &["actor_id", "connector", "sink_id", "sink_name"],
1097 registry
1098 )
1099 .unwrap();
1100
1101 let kv_log_store_buffer_unconsumed_row_count =
1102 register_guarded_int_gauge_vec_with_registry!(
1103 "kv_log_store_buffer_unconsumed_row_count",
1104 "Number of Unconsumed Row in buffer",
1105 &["actor_id", "connector", "sink_id", "sink_name"],
1106 registry
1107 )
1108 .unwrap();
1109
1110 let kv_log_store_buffer_unconsumed_epoch_count =
1111 register_guarded_int_gauge_vec_with_registry!(
1112 "kv_log_store_buffer_unconsumed_epoch_count",
1113 "Number of Unconsumed Epoch in buffer",
1114 &["actor_id", "connector", "sink_id", "sink_name"],
1115 registry
1116 )
1117 .unwrap();
1118
1119 let kv_log_store_buffer_unconsumed_min_epoch =
1120 register_guarded_int_gauge_vec_with_registry!(
1121 "kv_log_store_buffer_unconsumed_min_epoch",
1122 "Number of Unconsumed Epoch in buffer",
1123 &["actor_id", "connector", "sink_id", "sink_name"],
1124 registry
1125 )
1126 .unwrap();
1127
1128 let lru_runtime_loop_count = register_int_counter_with_registry!(
1129 "lru_runtime_loop_count",
1130 "The counts of the eviction loop in LRU manager per second",
1131 registry
1132 )
1133 .unwrap();
1134
1135 let lru_latest_sequence = register_int_gauge_with_registry!(
1136 "lru_latest_sequence",
1137 "Current LRU global sequence",
1138 registry,
1139 )
1140 .unwrap();
1141
1142 let lru_watermark_sequence = register_int_gauge_with_registry!(
1143 "lru_watermark_sequence",
1144 "Current LRU watermark sequence",
1145 registry,
1146 )
1147 .unwrap();
1148
1149 let lru_eviction_policy = register_int_gauge_with_registry!(
1150 "lru_eviction_policy",
1151 "Current LRU eviction policy",
1152 registry,
1153 )
1154 .unwrap();
1155
1156 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1157 "jemalloc_allocated_bytes",
1158 "The allocated memory jemalloc, got from jemalloc_ctl",
1159 registry
1160 )
1161 .unwrap();
1162
1163 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1164 "jemalloc_active_bytes",
1165 "The active memory jemalloc, got from jemalloc_ctl",
1166 registry
1167 )
1168 .unwrap();
1169
1170 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1171 "jemalloc_resident_bytes",
1172 "The active memory jemalloc, got from jemalloc_ctl",
1173 registry
1174 )
1175 .unwrap();
1176
1177 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1178 "jemalloc_metadata_bytes",
1179 "The active memory jemalloc, got from jemalloc_ctl",
1180 registry
1181 )
1182 .unwrap();
1183
1184 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1185 "jvm_allocated_bytes",
1186 "The allocated jvm memory",
1187 registry
1188 )
1189 .unwrap();
1190
1191 let jvm_active_bytes = register_int_gauge_with_registry!(
1192 "jvm_active_bytes",
1193 "The active jvm memory",
1194 registry
1195 )
1196 .unwrap();
1197
1198 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1199 "stream_materialize_cache_hit_count",
1200 "Materialize executor cache hit count",
1201 &["actor_id", "table_id", "fragment_id"],
1202 registry
1203 )
1204 .unwrap()
1205 .relabel_debug_1(level);
1206
1207 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1208 "stream_materialize_data_exist_count",
1209 "Materialize executor data exist count",
1210 &["actor_id", "table_id", "fragment_id"],
1211 registry
1212 )
1213 .unwrap()
1214 .relabel_debug_1(level);
1215
1216 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1217 "stream_materialize_cache_total_count",
1218 "Materialize executor cache total operation",
1219 &["actor_id", "table_id", "fragment_id"],
1220 registry
1221 )
1222 .unwrap()
1223 .relabel_debug_1(level);
1224
1225 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1226 "stream_memory_usage",
1227 "Memory usage for stream executors",
1228 &["actor_id", "table_id", "desc"],
1229 registry
1230 )
1231 .unwrap()
1232 .relabel_debug_1(level);
1233
1234 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1235 "gap_fill_generated_rows_count",
1236 "Total number of rows generated by gap fill executor",
1237 &["actor_id", "fragment_id"],
1238 registry
1239 )
1240 .unwrap()
1241 .relabel_debug_1(level);
1242
1243 Self {
1244 level,
1245 executor_row_count,
1246 mem_stream_node_output_row_count: stream_node_output_row_count,
1247 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1248 actor_execution_time,
1249 actor_scheduled_duration,
1250 actor_scheduled_cnt,
1251 actor_fast_poll_duration,
1252 actor_fast_poll_cnt,
1253 actor_slow_poll_duration,
1254 actor_slow_poll_cnt,
1255 actor_poll_duration,
1256 actor_poll_cnt,
1257 actor_idle_duration,
1258 actor_idle_cnt,
1259 actor_count,
1260 actor_in_record_cnt,
1261 actor_out_record_cnt,
1262 actor_current_epoch,
1263 source_output_row_count,
1264 source_split_change_count,
1265 source_backfill_row_count,
1266 sink_input_row_count,
1267 sink_input_bytes,
1268 sink_chunk_buffer_size,
1269 exchange_frag_recv_size,
1270 merge_barrier_align_duration,
1271 actor_output_buffer_blocking_duration_ns,
1272 actor_input_buffer_blocking_duration_ns,
1273 join_lookup_miss_count,
1274 join_lookup_total_count,
1275 join_insert_cache_miss_count,
1276 join_actor_input_waiting_duration_ns,
1277 join_match_duration_ns,
1278 join_cached_entry_count,
1279 join_matched_join_keys,
1280 barrier_align_duration,
1281 agg_lookup_miss_count,
1282 agg_total_lookup_count,
1283 agg_cached_entry_count,
1284 agg_chunk_lookup_miss_count,
1285 agg_chunk_total_lookup_count,
1286 agg_dirty_groups_count,
1287 agg_dirty_groups_heap_size,
1288 agg_distinct_cache_miss_count,
1289 agg_distinct_total_cache_count,
1290 agg_distinct_cached_entry_count,
1291 agg_state_cache_lookup_count,
1292 agg_state_cache_miss_count,
1293 group_top_n_cache_miss_count,
1294 group_top_n_total_query_cache_count,
1295 group_top_n_cached_entry_count,
1296 group_top_n_appendonly_cache_miss_count,
1297 group_top_n_appendonly_total_query_cache_count,
1298 group_top_n_appendonly_cached_entry_count,
1299 lookup_cache_miss_count,
1300 lookup_total_query_cache_count,
1301 lookup_cached_entry_count,
1302 temporal_join_cache_miss_count,
1303 temporal_join_total_query_cache_count,
1304 temporal_join_cached_entry_count,
1305 backfill_snapshot_read_row_count,
1306 backfill_upstream_output_row_count,
1307 cdc_backfill_snapshot_read_row_count,
1308 cdc_backfill_upstream_output_row_count,
1309 snapshot_backfill_consume_row_count,
1310 over_window_cached_entry_count,
1311 over_window_cache_lookup_count,
1312 over_window_cache_miss_count,
1313 over_window_range_cache_entry_count,
1314 over_window_range_cache_lookup_count,
1315 over_window_range_cache_left_miss_count,
1316 over_window_range_cache_right_miss_count,
1317 over_window_accessed_entry_count,
1318 over_window_compute_count,
1319 over_window_same_output_count,
1320 barrier_inflight_latency,
1321 barrier_sync_latency,
1322 barrier_batch_size,
1323 barrier_manager_progress,
1324 kv_log_store_storage_write_count,
1325 kv_log_store_storage_write_size,
1326 kv_log_store_rewind_count,
1327 kv_log_store_rewind_delay,
1328 kv_log_store_storage_read_count,
1329 kv_log_store_storage_read_size,
1330 kv_log_store_buffer_unconsumed_item_count,
1331 kv_log_store_buffer_unconsumed_row_count,
1332 kv_log_store_buffer_unconsumed_epoch_count,
1333 kv_log_store_buffer_unconsumed_min_epoch,
1334 sync_kv_log_store_read_count,
1335 sync_kv_log_store_read_size,
1336 sync_kv_log_store_write_pause_duration_ns,
1337 sync_kv_log_store_state,
1338 sync_kv_log_store_wait_next_poll_ns,
1339 sync_kv_log_store_storage_write_count,
1340 sync_kv_log_store_storage_write_size,
1341 sync_kv_log_store_buffer_unconsumed_item_count,
1342 sync_kv_log_store_buffer_unconsumed_row_count,
1343 sync_kv_log_store_buffer_unconsumed_epoch_count,
1344 sync_kv_log_store_buffer_unconsumed_min_epoch,
1345 lru_runtime_loop_count,
1346 lru_latest_sequence,
1347 lru_watermark_sequence,
1348 lru_eviction_policy,
1349 jemalloc_allocated_bytes,
1350 jemalloc_active_bytes,
1351 jemalloc_resident_bytes,
1352 jemalloc_metadata_bytes,
1353 jvm_allocated_bytes,
1354 jvm_active_bytes,
1355 stream_memory_usage,
1356 materialize_cache_hit_count,
1357 materialize_data_exist_count,
1358 materialize_cache_total_count,
1359 materialize_input_row_count,
1360 materialize_current_epoch,
1361 pg_cdc_state_table_lsn,
1362 pg_cdc_jni_commit_offset_lsn,
1363 mysql_cdc_state_binlog_file_seq,
1364 mysql_cdc_state_binlog_position,
1365 gap_fill_generated_rows_count,
1366 }
1367 }
1368
1369 pub fn unused() -> Self {
1371 global_streaming_metrics(MetricLevel::Disabled)
1372 }
1373
1374 pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1375 let label_list: &[&str; 1] = &[&actor_id.to_string()];
1376 let actor_execution_time = self
1377 .actor_execution_time
1378 .with_guarded_label_values(label_list);
1379 let actor_scheduled_duration = self
1380 .actor_scheduled_duration
1381 .with_guarded_label_values(label_list);
1382 let actor_scheduled_cnt = self
1383 .actor_scheduled_cnt
1384 .with_guarded_label_values(label_list);
1385 let actor_fast_poll_duration = self
1386 .actor_fast_poll_duration
1387 .with_guarded_label_values(label_list);
1388 let actor_fast_poll_cnt = self
1389 .actor_fast_poll_cnt
1390 .with_guarded_label_values(label_list);
1391 let actor_slow_poll_duration = self
1392 .actor_slow_poll_duration
1393 .with_guarded_label_values(label_list);
1394 let actor_slow_poll_cnt = self
1395 .actor_slow_poll_cnt
1396 .with_guarded_label_values(label_list);
1397 let actor_poll_duration = self
1398 .actor_poll_duration
1399 .with_guarded_label_values(label_list);
1400 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1401 let actor_idle_duration = self
1402 .actor_idle_duration
1403 .with_guarded_label_values(label_list);
1404 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1405 ActorMetrics {
1406 actor_execution_time,
1407 actor_scheduled_duration,
1408 actor_scheduled_cnt,
1409 actor_fast_poll_duration,
1410 actor_fast_poll_cnt,
1411 actor_slow_poll_duration,
1412 actor_slow_poll_cnt,
1413 actor_poll_duration,
1414 actor_poll_cnt,
1415 actor_idle_duration,
1416 actor_idle_cnt,
1417 }
1418 }
1419
1420 pub(crate) fn new_actor_input_metrics(
1421 &self,
1422 actor_id: ActorId,
1423 fragment_id: FragmentId,
1424 upstream_fragment_id: FragmentId,
1425 ) -> ActorInputMetrics {
1426 let actor_id_str = actor_id.to_string();
1427 let fragment_id_str = fragment_id.to_string();
1428 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1429 ActorInputMetrics {
1430 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1431 &actor_id_str,
1432 &fragment_id_str,
1433 &upstream_fragment_id_str,
1434 ]),
1435 actor_input_buffer_blocking_duration_ns: self
1436 .actor_input_buffer_blocking_duration_ns
1437 .with_guarded_label_values(&[
1438 &actor_id_str,
1439 &fragment_id_str,
1440 &upstream_fragment_id_str,
1441 ]),
1442 }
1443 }
1444
1445 pub fn new_sink_exec_metrics(
1446 &self,
1447 id: SinkId,
1448 actor_id: ActorId,
1449 fragment_id: FragmentId,
1450 ) -> SinkExecutorMetrics {
1451 let label_list: &[&str; 3] = &[
1452 &id.to_string(),
1453 &actor_id.to_string(),
1454 &fragment_id.to_string(),
1455 ];
1456 SinkExecutorMetrics {
1457 sink_input_row_count: self
1458 .sink_input_row_count
1459 .with_guarded_label_values(label_list),
1460 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1461 sink_chunk_buffer_size: self
1462 .sink_chunk_buffer_size
1463 .with_guarded_label_values(label_list),
1464 }
1465 }
1466
1467 pub fn new_group_top_n_metrics(
1468 &self,
1469 table_id: TableId,
1470 actor_id: ActorId,
1471 fragment_id: FragmentId,
1472 ) -> GroupTopNMetrics {
1473 let label_list: &[&str; 3] = &[
1474 &table_id.to_string(),
1475 &actor_id.to_string(),
1476 &fragment_id.to_string(),
1477 ];
1478
1479 GroupTopNMetrics {
1480 group_top_n_cache_miss_count: self
1481 .group_top_n_cache_miss_count
1482 .with_guarded_label_values(label_list),
1483 group_top_n_total_query_cache_count: self
1484 .group_top_n_total_query_cache_count
1485 .with_guarded_label_values(label_list),
1486 group_top_n_cached_entry_count: self
1487 .group_top_n_cached_entry_count
1488 .with_guarded_label_values(label_list),
1489 }
1490 }
1491
1492 pub fn new_append_only_group_top_n_metrics(
1493 &self,
1494 table_id: TableId,
1495 actor_id: ActorId,
1496 fragment_id: FragmentId,
1497 ) -> GroupTopNMetrics {
1498 let label_list: &[&str; 3] = &[
1499 &table_id.to_string(),
1500 &actor_id.to_string(),
1501 &fragment_id.to_string(),
1502 ];
1503
1504 GroupTopNMetrics {
1505 group_top_n_cache_miss_count: self
1506 .group_top_n_appendonly_cache_miss_count
1507 .with_guarded_label_values(label_list),
1508 group_top_n_total_query_cache_count: self
1509 .group_top_n_appendonly_total_query_cache_count
1510 .with_guarded_label_values(label_list),
1511 group_top_n_cached_entry_count: self
1512 .group_top_n_appendonly_cached_entry_count
1513 .with_guarded_label_values(label_list),
1514 }
1515 }
1516
1517 pub fn new_lookup_executor_metrics(
1518 &self,
1519 table_id: TableId,
1520 actor_id: ActorId,
1521 fragment_id: FragmentId,
1522 ) -> LookupExecutorMetrics {
1523 let label_list: &[&str; 3] = &[
1524 &table_id.to_string(),
1525 &actor_id.to_string(),
1526 &fragment_id.to_string(),
1527 ];
1528
1529 LookupExecutorMetrics {
1530 lookup_cache_miss_count: self
1531 .lookup_cache_miss_count
1532 .with_guarded_label_values(label_list),
1533 lookup_total_query_cache_count: self
1534 .lookup_total_query_cache_count
1535 .with_guarded_label_values(label_list),
1536 lookup_cached_entry_count: self
1537 .lookup_cached_entry_count
1538 .with_guarded_label_values(label_list),
1539 }
1540 }
1541
1542 pub fn new_hash_agg_metrics(
1543 &self,
1544 table_id: TableId,
1545 actor_id: ActorId,
1546 fragment_id: FragmentId,
1547 ) -> HashAggMetrics {
1548 let label_list: &[&str; 3] = &[
1549 &table_id.to_string(),
1550 &actor_id.to_string(),
1551 &fragment_id.to_string(),
1552 ];
1553 HashAggMetrics {
1554 agg_lookup_miss_count: self
1555 .agg_lookup_miss_count
1556 .with_guarded_label_values(label_list),
1557 agg_total_lookup_count: self
1558 .agg_total_lookup_count
1559 .with_guarded_label_values(label_list),
1560 agg_cached_entry_count: self
1561 .agg_cached_entry_count
1562 .with_guarded_label_values(label_list),
1563 agg_chunk_lookup_miss_count: self
1564 .agg_chunk_lookup_miss_count
1565 .with_guarded_label_values(label_list),
1566 agg_chunk_total_lookup_count: self
1567 .agg_chunk_total_lookup_count
1568 .with_guarded_label_values(label_list),
1569 agg_dirty_groups_count: self
1570 .agg_dirty_groups_count
1571 .with_guarded_label_values(label_list),
1572 agg_dirty_groups_heap_size: self
1573 .agg_dirty_groups_heap_size
1574 .with_guarded_label_values(label_list),
1575 agg_state_cache_lookup_count: self
1576 .agg_state_cache_lookup_count
1577 .with_guarded_label_values(label_list),
1578 agg_state_cache_miss_count: self
1579 .agg_state_cache_miss_count
1580 .with_guarded_label_values(label_list),
1581 }
1582 }
1583
1584 pub fn new_agg_distinct_dedup_metrics(
1585 &self,
1586 table_id: TableId,
1587 actor_id: ActorId,
1588 fragment_id: FragmentId,
1589 ) -> AggDistinctDedupMetrics {
1590 let label_list: &[&str; 3] = &[
1591 &table_id.to_string(),
1592 &actor_id.to_string(),
1593 &fragment_id.to_string(),
1594 ];
1595 AggDistinctDedupMetrics {
1596 agg_distinct_cache_miss_count: self
1597 .agg_distinct_cache_miss_count
1598 .with_guarded_label_values(label_list),
1599 agg_distinct_total_cache_count: self
1600 .agg_distinct_total_cache_count
1601 .with_guarded_label_values(label_list),
1602 agg_distinct_cached_entry_count: self
1603 .agg_distinct_cached_entry_count
1604 .with_guarded_label_values(label_list),
1605 }
1606 }
1607
1608 pub fn new_temporal_join_metrics(
1609 &self,
1610 table_id: TableId,
1611 actor_id: ActorId,
1612 fragment_id: FragmentId,
1613 ) -> TemporalJoinMetrics {
1614 let label_list: &[&str; 3] = &[
1615 &table_id.to_string(),
1616 &actor_id.to_string(),
1617 &fragment_id.to_string(),
1618 ];
1619 TemporalJoinMetrics {
1620 temporal_join_cache_miss_count: self
1621 .temporal_join_cache_miss_count
1622 .with_guarded_label_values(label_list),
1623 temporal_join_total_query_cache_count: self
1624 .temporal_join_total_query_cache_count
1625 .with_guarded_label_values(label_list),
1626 temporal_join_cached_entry_count: self
1627 .temporal_join_cached_entry_count
1628 .with_guarded_label_values(label_list),
1629 }
1630 }
1631
1632 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1633 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1634 BackfillMetrics {
1635 backfill_snapshot_read_row_count: self
1636 .backfill_snapshot_read_row_count
1637 .with_guarded_label_values(label_list),
1638 backfill_upstream_output_row_count: self
1639 .backfill_upstream_output_row_count
1640 .with_guarded_label_values(label_list),
1641 }
1642 }
1643
1644 pub fn new_cdc_backfill_metrics(
1645 &self,
1646 table_id: TableId,
1647 actor_id: ActorId,
1648 ) -> CdcBackfillMetrics {
1649 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1650 CdcBackfillMetrics {
1651 cdc_backfill_snapshot_read_row_count: self
1652 .cdc_backfill_snapshot_read_row_count
1653 .with_guarded_label_values(label_list),
1654 cdc_backfill_upstream_output_row_count: self
1655 .cdc_backfill_upstream_output_row_count
1656 .with_guarded_label_values(label_list),
1657 }
1658 }
1659
1660 pub fn new_over_window_metrics(
1661 &self,
1662 table_id: TableId,
1663 actor_id: ActorId,
1664 fragment_id: FragmentId,
1665 ) -> OverWindowMetrics {
1666 let label_list: &[&str; 3] = &[
1667 &table_id.to_string(),
1668 &actor_id.to_string(),
1669 &fragment_id.to_string(),
1670 ];
1671 OverWindowMetrics {
1672 over_window_cached_entry_count: self
1673 .over_window_cached_entry_count
1674 .with_guarded_label_values(label_list),
1675 over_window_cache_lookup_count: self
1676 .over_window_cache_lookup_count
1677 .with_guarded_label_values(label_list),
1678 over_window_cache_miss_count: self
1679 .over_window_cache_miss_count
1680 .with_guarded_label_values(label_list),
1681 over_window_range_cache_entry_count: self
1682 .over_window_range_cache_entry_count
1683 .with_guarded_label_values(label_list),
1684 over_window_range_cache_lookup_count: self
1685 .over_window_range_cache_lookup_count
1686 .with_guarded_label_values(label_list),
1687 over_window_range_cache_left_miss_count: self
1688 .over_window_range_cache_left_miss_count
1689 .with_guarded_label_values(label_list),
1690 over_window_range_cache_right_miss_count: self
1691 .over_window_range_cache_right_miss_count
1692 .with_guarded_label_values(label_list),
1693 over_window_accessed_entry_count: self
1694 .over_window_accessed_entry_count
1695 .with_guarded_label_values(label_list),
1696 over_window_compute_count: self
1697 .over_window_compute_count
1698 .with_guarded_label_values(label_list),
1699 over_window_same_output_count: self
1700 .over_window_same_output_count
1701 .with_guarded_label_values(label_list),
1702 }
1703 }
1704
1705 pub fn new_materialize_metrics(
1706 &self,
1707 table_id: TableId,
1708 actor_id: ActorId,
1709 fragment_id: FragmentId,
1710 ) -> MaterializeMetrics {
1711 let label_list: &[&str; 3] = &[
1712 &actor_id.to_string(),
1713 &table_id.to_string(),
1714 &fragment_id.to_string(),
1715 ];
1716 MaterializeMetrics {
1717 materialize_cache_hit_count: self
1718 .materialize_cache_hit_count
1719 .with_guarded_label_values(label_list),
1720 materialize_data_exist_count: self
1721 .materialize_data_exist_count
1722 .with_guarded_label_values(label_list),
1723 materialize_cache_total_count: self
1724 .materialize_cache_total_count
1725 .with_guarded_label_values(label_list),
1726 materialize_input_row_count: self
1727 .materialize_input_row_count
1728 .with_guarded_label_values(label_list),
1729 materialize_current_epoch: self
1730 .materialize_current_epoch
1731 .with_guarded_label_values(label_list),
1732 }
1733 }
1734}
1735
1736pub(crate) struct ActorInputMetrics {
1737 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1738 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1739}
1740
1741pub struct ActorMetrics {
1743 pub actor_execution_time: LabelGuardedGauge,
1744 pub actor_scheduled_duration: LabelGuardedGauge,
1745 pub actor_scheduled_cnt: LabelGuardedIntGauge,
1746 pub actor_fast_poll_duration: LabelGuardedGauge,
1747 pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1748 pub actor_slow_poll_duration: LabelGuardedGauge,
1749 pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1750 pub actor_poll_duration: LabelGuardedGauge,
1751 pub actor_poll_cnt: LabelGuardedIntGauge,
1752 pub actor_idle_duration: LabelGuardedGauge,
1753 pub actor_idle_cnt: LabelGuardedIntGauge,
1754}
1755
1756pub struct SinkExecutorMetrics {
1757 pub sink_input_row_count: LabelGuardedIntCounter,
1758 pub sink_input_bytes: LabelGuardedIntCounter,
1759 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1760}
1761
1762pub struct MaterializeMetrics {
1763 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1764 pub materialize_data_exist_count: LabelGuardedIntCounter,
1765 pub materialize_cache_total_count: LabelGuardedIntCounter,
1766 pub materialize_input_row_count: LabelGuardedIntCounter,
1767 pub materialize_current_epoch: LabelGuardedIntGauge,
1768}
1769
1770pub struct GroupTopNMetrics {
1771 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1772 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1773 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1774}
1775
1776pub struct LookupExecutorMetrics {
1777 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1778 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1779 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1780}
1781
1782pub struct HashAggMetrics {
1783 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1784 pub agg_total_lookup_count: LabelGuardedIntCounter,
1785 pub agg_cached_entry_count: LabelGuardedIntGauge,
1786 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1787 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1788 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1789 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1790 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1791 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1792}
1793
1794pub struct AggDistinctDedupMetrics {
1795 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1796 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1797 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1798}
1799
1800pub struct TemporalJoinMetrics {
1801 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1802 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1803 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1804}
1805
1806pub struct BackfillMetrics {
1807 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1808 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1809}
1810
1811#[derive(Clone)]
1812pub struct CdcBackfillMetrics {
1813 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1814 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1815}
1816
1817pub struct OverWindowMetrics {
1818 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1819 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1820 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1821 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1822 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1823 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1824 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1825 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1826 pub over_window_compute_count: LabelGuardedIntCounter,
1827 pub over_window_same_output_count: LabelGuardedIntCounter,
1828}