1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33 register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45 pub level: MetricLevel,
46
47 pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50 pub mem_stream_node_output_row_count: CountMap,
54 pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56 actor_execution_time: LabelGuardedGaugeVec,
58 actor_scheduled_duration: LabelGuardedGaugeVec,
59 actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60 actor_fast_poll_duration: LabelGuardedGaugeVec,
61 actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62 actor_slow_poll_duration: LabelGuardedGaugeVec,
63 actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64 actor_poll_duration: LabelGuardedGaugeVec,
65 actor_poll_cnt: LabelGuardedIntGaugeVec,
66 actor_idle_duration: LabelGuardedGaugeVec,
67 actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69 pub actor_count: LabelGuardedIntGaugeVec,
71 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75 pub source_output_row_count: LabelGuardedIntCounterVec,
77 pub source_split_change_count: LabelGuardedIntCounterVec,
78 pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80 sink_input_row_count: LabelGuardedIntCounterVec,
82 sink_input_bytes: LabelGuardedIntCounterVec,
83 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88 pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94 actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
95
96 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98 pub join_lookup_total_count: LabelGuardedIntCounterVec,
99 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101 pub join_match_duration_ns: LabelGuardedIntCounterVec,
102 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108 agg_lookup_miss_count: LabelGuardedIntCounterVec,
110 agg_total_lookup_count: LabelGuardedIntCounterVec,
111 agg_cached_entry_count: LabelGuardedIntGaugeVec,
112 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131 lookup_cache_miss_count: LabelGuardedIntCounterVec,
133 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155 over_window_cache_miss_count: LabelGuardedIntCounterVec,
156 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161 over_window_compute_count: LabelGuardedIntCounterVec,
162 over_window_same_output_count: LabelGuardedIntCounterVec,
163
164 pub barrier_inflight_latency: Histogram,
168 pub barrier_sync_latency: Histogram,
170 pub barrier_batch_size: Histogram,
171 pub barrier_manager_progress: IntCounter,
173
174 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197 pub lru_runtime_loop_count: IntCounter,
199 pub lru_latest_sequence: IntGauge,
200 pub lru_watermark_sequence: IntGauge,
201 pub lru_eviction_policy: IntGauge,
202 pub jemalloc_allocated_bytes: IntGauge,
203 pub jemalloc_active_bytes: IntGauge,
204 pub jemalloc_resident_bytes: IntGauge,
205 pub jemalloc_metadata_bytes: IntGauge,
206 pub jvm_allocated_bytes: IntGauge,
207 pub jvm_active_bytes: IntGauge,
208 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214 materialize_input_row_count: RelabeledGuardedIntCounterVec,
215 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223}
224
225pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
226
227pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
228 GLOBAL_STREAMING_METRICS
229 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
230 .clone()
231}
232
233impl StreamingMetrics {
234 fn new(registry: &Registry, level: MetricLevel) -> Self {
235 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
236 "stream_executor_row_count",
237 "Total number of rows that have been output from each executor",
238 &["actor_id", "fragment_id", "executor_identity"],
239 registry
240 )
241 .unwrap()
242 .relabel_debug_1(level);
243
244 let stream_node_output_row_count = CountMap::new();
245 let stream_node_output_blocking_duration_ns = CountMap::new();
246
247 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
248 "stream_source_output_rows_counts",
249 "Total number of rows that have been output from source",
250 &["source_id", "source_name", "actor_id", "fragment_id"],
251 registry
252 )
253 .unwrap();
254
255 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
256 "stream_source_split_change_event_count",
257 "Total number of split change events that have been operated by source",
258 &["source_id", "source_name", "actor_id", "fragment_id"],
259 registry
260 )
261 .unwrap();
262
263 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
264 "stream_source_backfill_rows_counts",
265 "Total number of rows that have been backfilled for source",
266 &["source_id", "source_name", "actor_id", "fragment_id"],
267 registry
268 )
269 .unwrap();
270
271 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
272 "stream_sink_input_row_count",
273 "Total number of rows streamed into sink executors",
274 &["sink_id", "actor_id", "fragment_id"],
275 registry
276 )
277 .unwrap();
278
279 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
280 "stream_sink_input_bytes",
281 "Total size of chunks streamed into sink executors",
282 &["sink_id", "actor_id", "fragment_id"],
283 registry
284 )
285 .unwrap();
286
287 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
288 "stream_mview_input_row_count",
289 "Total number of rows streamed into materialize executors",
290 &["actor_id", "table_id", "fragment_id"],
291 registry
292 )
293 .unwrap()
294 .relabel_debug_1(level);
295
296 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
297 "stream_mview_current_epoch",
298 "The current epoch of the materialized executor",
299 &["actor_id", "table_id", "fragment_id"],
300 registry
301 )
302 .unwrap()
303 .relabel_debug_1(level);
304
305 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
306 "stream_pg_cdc_state_table_lsn",
307 "Current LSN value stored in PostgreSQL CDC state table",
308 &["source_id"],
309 registry,
310 )
311 .unwrap();
312
313 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
314 "stream_pg_cdc_jni_commit_offset_lsn",
315 "LSN value when JNI commit offset is called for PostgreSQL CDC",
316 &["source_id"],
317 registry,
318 )
319 .unwrap();
320
321 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
322 "stream_sink_chunk_buffer_size",
323 "Total size of chunks buffered in a barrier",
324 &["sink_id", "actor_id", "fragment_id"],
325 registry
326 )
327 .unwrap();
328
329 let actor_execution_time = register_guarded_gauge_vec_with_registry!(
330 "stream_actor_actor_execution_time",
331 "Total execution time (s) of an actor",
332 &["actor_id"],
333 registry
334 )
335 .unwrap();
336
337 let actor_output_buffer_blocking_duration_ns =
338 register_guarded_int_counter_vec_with_registry!(
339 "stream_actor_output_buffer_blocking_duration_ns",
340 "Total blocking duration (ns) of output buffer",
341 &["actor_id", "fragment_id", "downstream_fragment_id"],
342 registry
343 )
344 .unwrap()
345 .relabel_debug_1(level);
347
348 let actor_input_buffer_blocking_duration_ns =
349 register_guarded_int_counter_vec_with_registry!(
350 "stream_actor_input_buffer_blocking_duration_ns",
351 "Total blocking duration (ns) of input buffer",
352 &["actor_id", "fragment_id", "upstream_fragment_id"],
353 registry
354 )
355 .unwrap()
356 .relabel_debug_1(level);
358
359 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
360 "stream_exchange_frag_recv_size",
361 "Total size of messages that have been received from upstream Fragment",
362 &["up_fragment_id", "down_fragment_id"],
363 registry
364 )
365 .unwrap();
366
367 let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
368 "stream_actor_fast_poll_duration",
369 "tokio's metrics",
370 &["actor_id"],
371 registry
372 )
373 .unwrap();
374
375 let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
376 "stream_actor_fast_poll_cnt",
377 "tokio's metrics",
378 &["actor_id"],
379 registry
380 )
381 .unwrap();
382
383 let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
384 "stream_actor_slow_poll_duration",
385 "tokio's metrics",
386 &["actor_id"],
387 registry
388 )
389 .unwrap();
390
391 let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
392 "stream_actor_slow_poll_cnt",
393 "tokio's metrics",
394 &["actor_id"],
395 registry
396 )
397 .unwrap();
398
399 let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
400 "stream_actor_poll_duration",
401 "tokio's metrics",
402 &["actor_id"],
403 registry
404 )
405 .unwrap();
406
407 let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
408 "stream_actor_poll_cnt",
409 "tokio's metrics",
410 &["actor_id"],
411 registry
412 )
413 .unwrap();
414
415 let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
416 "stream_actor_scheduled_duration",
417 "tokio's metrics",
418 &["actor_id"],
419 registry
420 )
421 .unwrap();
422
423 let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
424 "stream_actor_scheduled_cnt",
425 "tokio's metrics",
426 &["actor_id"],
427 registry
428 )
429 .unwrap();
430
431 let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
432 "stream_actor_idle_duration",
433 "tokio's metrics",
434 &["actor_id"],
435 registry
436 )
437 .unwrap();
438
439 let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
440 "stream_actor_idle_cnt",
441 "tokio's metrics",
442 &["actor_id"],
443 registry
444 )
445 .unwrap();
446
447 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
448 "stream_actor_in_record_cnt",
449 "Total number of rows actor received",
450 &["actor_id", "fragment_id", "upstream_fragment_id"],
451 registry
452 )
453 .unwrap()
454 .relabel_debug_1(level);
455
456 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
457 "stream_actor_out_record_cnt",
458 "Total number of rows actor sent",
459 &["actor_id", "fragment_id"],
460 registry
461 )
462 .unwrap()
463 .relabel_debug_1(level);
464
465 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
466 "stream_actor_current_epoch",
467 "Current epoch of actor",
468 &["actor_id", "fragment_id"],
469 registry
470 )
471 .unwrap()
472 .relabel_debug_1(level);
473
474 let actor_count = register_guarded_int_gauge_vec_with_registry!(
475 "stream_actor_count",
476 "Total number of actors (parallelism)",
477 &["fragment_id"],
478 registry
479 )
480 .unwrap();
481
482 let opts = histogram_opts!(
483 "stream_merge_barrier_align_duration",
484 "Duration of merge align barrier",
485 exponential_buckets(0.0001, 2.0, 21).unwrap() );
487 let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
488 opts,
489 &["actor_id", "fragment_id"],
490 registry
491 )
492 .unwrap()
493 .relabel_debug_1(level);
494
495 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
496 "stream_join_lookup_miss_count",
497 "Join executor lookup miss duration",
498 &["side", "join_table_id", "actor_id", "fragment_id"],
499 registry
500 )
501 .unwrap();
502
503 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
504 "stream_join_lookup_total_count",
505 "Join executor lookup total operation",
506 &["side", "join_table_id", "actor_id", "fragment_id"],
507 registry
508 )
509 .unwrap();
510
511 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
512 "stream_join_insert_cache_miss_count",
513 "Join executor cache miss when insert operation",
514 &["side", "join_table_id", "actor_id", "fragment_id"],
515 registry
516 )
517 .unwrap();
518
519 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
520 "stream_join_actor_input_waiting_duration_ns",
521 "Total waiting duration (ns) of input buffer of join actor",
522 &["actor_id", "fragment_id"],
523 registry
524 )
525 .unwrap();
526
527 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
528 "stream_join_match_duration_ns",
529 "Matching duration for each side",
530 &["actor_id", "fragment_id", "side"],
531 registry
532 )
533 .unwrap();
534
535 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
536 "stream_barrier_align_duration_ns",
537 "Duration of join align barrier",
538 &["actor_id", "fragment_id", "wait_side", "executor"],
539 registry
540 )
541 .unwrap()
542 .relabel_debug_1(level);
543
544 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
545 "stream_join_cached_entry_count",
546 "Number of cached entries in streaming join operators",
547 &["actor_id", "fragment_id", "side"],
548 registry
549 )
550 .unwrap();
551
552 let join_matched_join_keys_opts = histogram_opts!(
553 "stream_join_matched_join_keys",
554 "The number of keys matched in the opposite side",
555 exponential_buckets(16.0, 2.0, 28).unwrap() );
557
558 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
559 join_matched_join_keys_opts,
560 &["actor_id", "fragment_id", "table_id"],
561 registry
562 )
563 .unwrap()
564 .relabel_debug_1(level);
565
566 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
567 "stream_agg_lookup_miss_count",
568 "Aggregation executor lookup miss duration",
569 &["table_id", "actor_id", "fragment_id"],
570 registry
571 )
572 .unwrap();
573
574 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
575 "stream_agg_lookup_total_count",
576 "Aggregation executor lookup total operation",
577 &["table_id", "actor_id", "fragment_id"],
578 registry
579 )
580 .unwrap();
581
582 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
583 "stream_agg_distinct_cache_miss_count",
584 "Aggregation executor dinsinct miss duration",
585 &["table_id", "actor_id", "fragment_id"],
586 registry
587 )
588 .unwrap();
589
590 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
591 "stream_agg_distinct_total_cache_count",
592 "Aggregation executor distinct total operation",
593 &["table_id", "actor_id", "fragment_id"],
594 registry
595 )
596 .unwrap();
597
598 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
599 "stream_agg_distinct_cached_entry_count",
600 "Total entry counts in distinct aggregation executor cache",
601 &["table_id", "actor_id", "fragment_id"],
602 registry
603 )
604 .unwrap();
605
606 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
607 "stream_agg_dirty_groups_count",
608 "Total dirty group counts in aggregation executor",
609 &["table_id", "actor_id", "fragment_id"],
610 registry
611 )
612 .unwrap();
613
614 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
615 "stream_agg_dirty_groups_heap_size",
616 "Total dirty group heap size in aggregation executor",
617 &["table_id", "actor_id", "fragment_id"],
618 registry
619 )
620 .unwrap();
621
622 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
623 "stream_agg_state_cache_lookup_count",
624 "Aggregation executor state cache lookup count",
625 &["table_id", "actor_id", "fragment_id"],
626 registry
627 )
628 .unwrap();
629
630 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
631 "stream_agg_state_cache_miss_count",
632 "Aggregation executor state cache miss count",
633 &["table_id", "actor_id", "fragment_id"],
634 registry
635 )
636 .unwrap();
637
638 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
639 "stream_group_top_n_cache_miss_count",
640 "Group top n executor cache miss count",
641 &["table_id", "actor_id", "fragment_id"],
642 registry
643 )
644 .unwrap();
645
646 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
647 "stream_group_top_n_total_query_cache_count",
648 "Group top n executor query cache total count",
649 &["table_id", "actor_id", "fragment_id"],
650 registry
651 )
652 .unwrap();
653
654 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
655 "stream_group_top_n_cached_entry_count",
656 "Total entry counts in group top n executor cache",
657 &["table_id", "actor_id", "fragment_id"],
658 registry
659 )
660 .unwrap();
661
662 let group_top_n_appendonly_cache_miss_count =
663 register_guarded_int_counter_vec_with_registry!(
664 "stream_group_top_n_appendonly_cache_miss_count",
665 "Group top n appendonly executor cache miss count",
666 &["table_id", "actor_id", "fragment_id"],
667 registry
668 )
669 .unwrap();
670
671 let group_top_n_appendonly_total_query_cache_count =
672 register_guarded_int_counter_vec_with_registry!(
673 "stream_group_top_n_appendonly_total_query_cache_count",
674 "Group top n appendonly executor total cache count",
675 &["table_id", "actor_id", "fragment_id"],
676 registry
677 )
678 .unwrap();
679
680 let group_top_n_appendonly_cached_entry_count =
681 register_guarded_int_gauge_vec_with_registry!(
682 "stream_group_top_n_appendonly_cached_entry_count",
683 "Total entry counts in group top n appendonly executor cache",
684 &["table_id", "actor_id", "fragment_id"],
685 registry
686 )
687 .unwrap();
688
689 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
690 "stream_lookup_cache_miss_count",
691 "Lookup executor cache miss count",
692 &["table_id", "actor_id", "fragment_id"],
693 registry
694 )
695 .unwrap();
696
697 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
698 "stream_lookup_total_query_cache_count",
699 "Lookup executor query cache total count",
700 &["table_id", "actor_id", "fragment_id"],
701 registry
702 )
703 .unwrap();
704
705 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
706 "stream_lookup_cached_entry_count",
707 "Total entry counts in lookup executor cache",
708 &["table_id", "actor_id", "fragment_id"],
709 registry
710 )
711 .unwrap();
712
713 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
714 "stream_temporal_join_cache_miss_count",
715 "Temporal join executor cache miss count",
716 &["table_id", "actor_id", "fragment_id"],
717 registry
718 )
719 .unwrap();
720
721 let temporal_join_total_query_cache_count =
722 register_guarded_int_counter_vec_with_registry!(
723 "stream_temporal_join_total_query_cache_count",
724 "Temporal join executor query cache total count",
725 &["table_id", "actor_id", "fragment_id"],
726 registry
727 )
728 .unwrap();
729
730 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
731 "stream_temporal_join_cached_entry_count",
732 "Total entry count in temporal join executor cache",
733 &["table_id", "actor_id", "fragment_id"],
734 registry
735 )
736 .unwrap();
737
738 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
739 "stream_agg_cached_entry_count",
740 "Number of cached keys in streaming aggregation operators",
741 &["table_id", "actor_id", "fragment_id"],
742 registry
743 )
744 .unwrap();
745
746 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
747 "stream_agg_chunk_lookup_miss_count",
748 "Aggregation executor chunk-level lookup miss duration",
749 &["table_id", "actor_id", "fragment_id"],
750 registry
751 )
752 .unwrap();
753
754 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
755 "stream_agg_chunk_lookup_total_count",
756 "Aggregation executor chunk-level lookup total operation",
757 &["table_id", "actor_id", "fragment_id"],
758 registry
759 )
760 .unwrap();
761
762 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
763 "stream_backfill_snapshot_read_row_count",
764 "Total number of rows that have been read from the backfill snapshot",
765 &["table_id", "actor_id"],
766 registry
767 )
768 .unwrap();
769
770 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
771 "stream_backfill_upstream_output_row_count",
772 "Total number of rows that have been output from the backfill upstream",
773 &["table_id", "actor_id"],
774 registry
775 )
776 .unwrap();
777
778 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
779 "stream_cdc_backfill_snapshot_read_row_count",
780 "Total number of rows that have been read from the cdc_backfill snapshot",
781 &["table_id", "actor_id"],
782 registry
783 )
784 .unwrap();
785
786 let cdc_backfill_upstream_output_row_count =
787 register_guarded_int_counter_vec_with_registry!(
788 "stream_cdc_backfill_upstream_output_row_count",
789 "Total number of rows that have been output from the cdc_backfill upstream",
790 &["table_id", "actor_id"],
791 registry
792 )
793 .unwrap();
794
795 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
796 "stream_snapshot_backfill_consume_snapshot_row_count",
797 "Total number of rows that have been output from snapshot backfill",
798 &["table_id", "actor_id", "stage"],
799 registry
800 )
801 .unwrap();
802
803 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
804 "stream_over_window_cached_entry_count",
805 "Total entry (partition) count in over window executor cache",
806 &["table_id", "actor_id", "fragment_id"],
807 registry
808 )
809 .unwrap();
810
811 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
812 "stream_over_window_cache_lookup_count",
813 "Over window executor cache lookup count",
814 &["table_id", "actor_id", "fragment_id"],
815 registry
816 )
817 .unwrap();
818
819 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
820 "stream_over_window_cache_miss_count",
821 "Over window executor cache miss count",
822 &["table_id", "actor_id", "fragment_id"],
823 registry
824 )
825 .unwrap();
826
827 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
828 "stream_over_window_range_cache_entry_count",
829 "Over window partition range cache entry count",
830 &["table_id", "actor_id", "fragment_id"],
831 registry,
832 )
833 .unwrap();
834
835 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
836 "stream_over_window_range_cache_lookup_count",
837 "Over window partition range cache lookup count",
838 &["table_id", "actor_id", "fragment_id"],
839 registry
840 )
841 .unwrap();
842
843 let over_window_range_cache_left_miss_count =
844 register_guarded_int_counter_vec_with_registry!(
845 "stream_over_window_range_cache_left_miss_count",
846 "Over window partition range cache left miss count",
847 &["table_id", "actor_id", "fragment_id"],
848 registry
849 )
850 .unwrap();
851
852 let over_window_range_cache_right_miss_count =
853 register_guarded_int_counter_vec_with_registry!(
854 "stream_over_window_range_cache_right_miss_count",
855 "Over window partition range cache right miss count",
856 &["table_id", "actor_id", "fragment_id"],
857 registry
858 )
859 .unwrap();
860
861 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
862 "stream_over_window_accessed_entry_count",
863 "Over window accessed entry count",
864 &["table_id", "actor_id", "fragment_id"],
865 registry
866 )
867 .unwrap();
868
869 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
870 "stream_over_window_compute_count",
871 "Over window compute count",
872 &["table_id", "actor_id", "fragment_id"],
873 registry
874 )
875 .unwrap();
876
877 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
878 "stream_over_window_same_output_count",
879 "Over window same output count",
880 &["table_id", "actor_id", "fragment_id"],
881 registry
882 )
883 .unwrap();
884
885 let opts = histogram_opts!(
886 "stream_barrier_inflight_duration_seconds",
887 "barrier_inflight_latency",
888 exponential_buckets(0.1, 1.5, 16).unwrap() );
890 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
891
892 let opts = histogram_opts!(
893 "stream_barrier_sync_storage_duration_seconds",
894 "barrier_sync_latency",
895 exponential_buckets(0.1, 1.5, 16).unwrap() );
897 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
898
899 let opts = histogram_opts!(
900 "stream_barrier_batch_size",
901 "barrier_batch_size",
902 exponential_buckets(1.0, 2.0, 8).unwrap()
903 );
904 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
905
906 let barrier_manager_progress = register_int_counter_with_registry!(
907 "stream_barrier_manager_progress",
908 "The number of actors that have processed the earliest in-flight barriers",
909 registry
910 )
911 .unwrap();
912
913 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
914 "sync_kv_log_store_wait_next_poll_ns",
915 "Total duration (ns) of waiting for next poll",
916 &["actor_id", "target", "fragment_id", "relation"],
917 registry
918 )
919 .unwrap();
920
921 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
922 "sync_kv_log_store_read_count",
923 "read row count throughput of sync_kv log store",
924 &["type", "actor_id", "target", "fragment_id", "relation"],
925 registry
926 )
927 .unwrap();
928
929 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
930 "sync_kv_log_store_read_size",
931 "read size throughput of sync_kv log store",
932 &["type", "actor_id", "target", "fragment_id", "relation"],
933 registry
934 )
935 .unwrap();
936
937 let sync_kv_log_store_write_pause_duration_ns =
938 register_guarded_int_counter_vec_with_registry!(
939 "sync_kv_log_store_write_pause_duration_ns",
940 "Duration (ns) of sync_kv log store write pause",
941 &["actor_id", "target", "fragment_id", "relation"],
942 registry
943 )
944 .unwrap();
945
946 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
947 "sync_kv_log_store_state",
948 "clean/unclean state transition for sync_kv log store",
949 &["state", "actor_id", "target", "fragment_id", "relation"],
950 registry
951 )
952 .unwrap();
953
954 let sync_kv_log_store_storage_write_count =
955 register_guarded_int_counter_vec_with_registry!(
956 "sync_kv_log_store_storage_write_count",
957 "Write row count throughput of sync_kv log store",
958 &["actor_id", "target", "fragment_id", "relation"],
959 registry
960 )
961 .unwrap();
962
963 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
964 "sync_kv_log_store_storage_write_size",
965 "Write size throughput of sync_kv log store",
966 &["actor_id", "target", "fragment_id", "relation"],
967 registry
968 )
969 .unwrap();
970
971 let sync_kv_log_store_buffer_unconsumed_item_count =
972 register_guarded_int_gauge_vec_with_registry!(
973 "sync_kv_log_store_buffer_unconsumed_item_count",
974 "Number of Unconsumed Item in buffer",
975 &["actor_id", "target", "fragment_id", "relation"],
976 registry
977 )
978 .unwrap();
979
980 let sync_kv_log_store_buffer_unconsumed_row_count =
981 register_guarded_int_gauge_vec_with_registry!(
982 "sync_kv_log_store_buffer_unconsumed_row_count",
983 "Number of Unconsumed Row in buffer",
984 &["actor_id", "target", "fragment_id", "relation"],
985 registry
986 )
987 .unwrap();
988
989 let sync_kv_log_store_buffer_unconsumed_epoch_count =
990 register_guarded_int_gauge_vec_with_registry!(
991 "sync_kv_log_store_buffer_unconsumed_epoch_count",
992 "Number of Unconsumed Epoch in buffer",
993 &["actor_id", "target", "fragment_id", "relation"],
994 registry
995 )
996 .unwrap();
997
998 let sync_kv_log_store_buffer_unconsumed_min_epoch =
999 register_guarded_int_gauge_vec_with_registry!(
1000 "sync_kv_log_store_buffer_unconsumed_min_epoch",
1001 "Number of Unconsumed Epoch in buffer",
1002 &["actor_id", "target", "fragment_id", "relation"],
1003 registry
1004 )
1005 .unwrap();
1006
1007 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1008 "kv_log_store_storage_write_count",
1009 "Write row count throughput of kv log store",
1010 &["actor_id", "connector", "sink_id", "sink_name"],
1011 registry
1012 )
1013 .unwrap();
1014
1015 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1016 "kv_log_store_storage_write_size",
1017 "Write size throughput of kv log store",
1018 &["actor_id", "connector", "sink_id", "sink_name"],
1019 registry
1020 )
1021 .unwrap();
1022
1023 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1024 "kv_log_store_storage_read_count",
1025 "Write row count throughput of kv log store",
1026 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1027 registry
1028 )
1029 .unwrap();
1030
1031 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1032 "kv_log_store_storage_read_size",
1033 "Write size throughput of kv log store",
1034 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1035 registry
1036 )
1037 .unwrap();
1038
1039 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1040 "kv_log_store_rewind_count",
1041 "Kv log store rewind rate",
1042 &["actor_id", "connector", "sink_id", "sink_name"],
1043 registry
1044 )
1045 .unwrap();
1046
1047 let kv_log_store_rewind_delay_opts = {
1048 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1049 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1050 - REWIND_BASE_DELAY.as_secs_f64().log2())
1051 .ceil() as usize;
1052 let buckets = exponential_buckets(
1053 REWIND_BASE_DELAY.as_secs_f64(),
1054 REWIND_BACKOFF_FACTOR as _,
1055 bucket_count,
1056 )
1057 .unwrap();
1058 histogram_opts!(
1059 "kv_log_store_rewind_delay",
1060 "Kv log store rewind delay",
1061 buckets,
1062 )
1063 };
1064
1065 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1066 kv_log_store_rewind_delay_opts,
1067 &["actor_id", "connector", "sink_id", "sink_name"],
1068 registry
1069 )
1070 .unwrap();
1071
1072 let kv_log_store_buffer_unconsumed_item_count =
1073 register_guarded_int_gauge_vec_with_registry!(
1074 "kv_log_store_buffer_unconsumed_item_count",
1075 "Number of Unconsumed Item in buffer",
1076 &["actor_id", "connector", "sink_id", "sink_name"],
1077 registry
1078 )
1079 .unwrap();
1080
1081 let kv_log_store_buffer_unconsumed_row_count =
1082 register_guarded_int_gauge_vec_with_registry!(
1083 "kv_log_store_buffer_unconsumed_row_count",
1084 "Number of Unconsumed Row in buffer",
1085 &["actor_id", "connector", "sink_id", "sink_name"],
1086 registry
1087 )
1088 .unwrap();
1089
1090 let kv_log_store_buffer_unconsumed_epoch_count =
1091 register_guarded_int_gauge_vec_with_registry!(
1092 "kv_log_store_buffer_unconsumed_epoch_count",
1093 "Number of Unconsumed Epoch in buffer",
1094 &["actor_id", "connector", "sink_id", "sink_name"],
1095 registry
1096 )
1097 .unwrap();
1098
1099 let kv_log_store_buffer_unconsumed_min_epoch =
1100 register_guarded_int_gauge_vec_with_registry!(
1101 "kv_log_store_buffer_unconsumed_min_epoch",
1102 "Number of Unconsumed Epoch in buffer",
1103 &["actor_id", "connector", "sink_id", "sink_name"],
1104 registry
1105 )
1106 .unwrap();
1107
1108 let lru_runtime_loop_count = register_int_counter_with_registry!(
1109 "lru_runtime_loop_count",
1110 "The counts of the eviction loop in LRU manager per second",
1111 registry
1112 )
1113 .unwrap();
1114
1115 let lru_latest_sequence = register_int_gauge_with_registry!(
1116 "lru_latest_sequence",
1117 "Current LRU global sequence",
1118 registry,
1119 )
1120 .unwrap();
1121
1122 let lru_watermark_sequence = register_int_gauge_with_registry!(
1123 "lru_watermark_sequence",
1124 "Current LRU watermark sequence",
1125 registry,
1126 )
1127 .unwrap();
1128
1129 let lru_eviction_policy = register_int_gauge_with_registry!(
1130 "lru_eviction_policy",
1131 "Current LRU eviction policy",
1132 registry,
1133 )
1134 .unwrap();
1135
1136 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1137 "jemalloc_allocated_bytes",
1138 "The allocated memory jemalloc, got from jemalloc_ctl",
1139 registry
1140 )
1141 .unwrap();
1142
1143 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1144 "jemalloc_active_bytes",
1145 "The active memory jemalloc, got from jemalloc_ctl",
1146 registry
1147 )
1148 .unwrap();
1149
1150 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1151 "jemalloc_resident_bytes",
1152 "The active memory jemalloc, got from jemalloc_ctl",
1153 registry
1154 )
1155 .unwrap();
1156
1157 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1158 "jemalloc_metadata_bytes",
1159 "The active memory jemalloc, got from jemalloc_ctl",
1160 registry
1161 )
1162 .unwrap();
1163
1164 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1165 "jvm_allocated_bytes",
1166 "The allocated jvm memory",
1167 registry
1168 )
1169 .unwrap();
1170
1171 let jvm_active_bytes = register_int_gauge_with_registry!(
1172 "jvm_active_bytes",
1173 "The active jvm memory",
1174 registry
1175 )
1176 .unwrap();
1177
1178 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1179 "stream_materialize_cache_hit_count",
1180 "Materialize executor cache hit count",
1181 &["actor_id", "table_id", "fragment_id"],
1182 registry
1183 )
1184 .unwrap()
1185 .relabel_debug_1(level);
1186
1187 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1188 "stream_materialize_data_exist_count",
1189 "Materialize executor data exist count",
1190 &["actor_id", "table_id", "fragment_id"],
1191 registry
1192 )
1193 .unwrap()
1194 .relabel_debug_1(level);
1195
1196 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1197 "stream_materialize_cache_total_count",
1198 "Materialize executor cache total operation",
1199 &["actor_id", "table_id", "fragment_id"],
1200 registry
1201 )
1202 .unwrap()
1203 .relabel_debug_1(level);
1204
1205 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1206 "stream_memory_usage",
1207 "Memory usage for stream executors",
1208 &["actor_id", "table_id", "desc"],
1209 registry
1210 )
1211 .unwrap()
1212 .relabel_debug_1(level);
1213
1214 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1215 "gap_fill_generated_rows_count",
1216 "Total number of rows generated by gap fill executor",
1217 &["actor_id", "fragment_id"],
1218 registry
1219 )
1220 .unwrap()
1221 .relabel_debug_1(level);
1222
1223 Self {
1224 level,
1225 executor_row_count,
1226 mem_stream_node_output_row_count: stream_node_output_row_count,
1227 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1228 actor_execution_time,
1229 actor_scheduled_duration,
1230 actor_scheduled_cnt,
1231 actor_fast_poll_duration,
1232 actor_fast_poll_cnt,
1233 actor_slow_poll_duration,
1234 actor_slow_poll_cnt,
1235 actor_poll_duration,
1236 actor_poll_cnt,
1237 actor_idle_duration,
1238 actor_idle_cnt,
1239 actor_count,
1240 actor_in_record_cnt,
1241 actor_out_record_cnt,
1242 actor_current_epoch,
1243 source_output_row_count,
1244 source_split_change_count,
1245 source_backfill_row_count,
1246 sink_input_row_count,
1247 sink_input_bytes,
1248 sink_chunk_buffer_size,
1249 exchange_frag_recv_size,
1250 merge_barrier_align_duration,
1251 actor_output_buffer_blocking_duration_ns,
1252 actor_input_buffer_blocking_duration_ns,
1253 join_lookup_miss_count,
1254 join_lookup_total_count,
1255 join_insert_cache_miss_count,
1256 join_actor_input_waiting_duration_ns,
1257 join_match_duration_ns,
1258 join_cached_entry_count,
1259 join_matched_join_keys,
1260 barrier_align_duration,
1261 agg_lookup_miss_count,
1262 agg_total_lookup_count,
1263 agg_cached_entry_count,
1264 agg_chunk_lookup_miss_count,
1265 agg_chunk_total_lookup_count,
1266 agg_dirty_groups_count,
1267 agg_dirty_groups_heap_size,
1268 agg_distinct_cache_miss_count,
1269 agg_distinct_total_cache_count,
1270 agg_distinct_cached_entry_count,
1271 agg_state_cache_lookup_count,
1272 agg_state_cache_miss_count,
1273 group_top_n_cache_miss_count,
1274 group_top_n_total_query_cache_count,
1275 group_top_n_cached_entry_count,
1276 group_top_n_appendonly_cache_miss_count,
1277 group_top_n_appendonly_total_query_cache_count,
1278 group_top_n_appendonly_cached_entry_count,
1279 lookup_cache_miss_count,
1280 lookup_total_query_cache_count,
1281 lookup_cached_entry_count,
1282 temporal_join_cache_miss_count,
1283 temporal_join_total_query_cache_count,
1284 temporal_join_cached_entry_count,
1285 backfill_snapshot_read_row_count,
1286 backfill_upstream_output_row_count,
1287 cdc_backfill_snapshot_read_row_count,
1288 cdc_backfill_upstream_output_row_count,
1289 snapshot_backfill_consume_row_count,
1290 over_window_cached_entry_count,
1291 over_window_cache_lookup_count,
1292 over_window_cache_miss_count,
1293 over_window_range_cache_entry_count,
1294 over_window_range_cache_lookup_count,
1295 over_window_range_cache_left_miss_count,
1296 over_window_range_cache_right_miss_count,
1297 over_window_accessed_entry_count,
1298 over_window_compute_count,
1299 over_window_same_output_count,
1300 barrier_inflight_latency,
1301 barrier_sync_latency,
1302 barrier_batch_size,
1303 barrier_manager_progress,
1304 kv_log_store_storage_write_count,
1305 kv_log_store_storage_write_size,
1306 kv_log_store_rewind_count,
1307 kv_log_store_rewind_delay,
1308 kv_log_store_storage_read_count,
1309 kv_log_store_storage_read_size,
1310 kv_log_store_buffer_unconsumed_item_count,
1311 kv_log_store_buffer_unconsumed_row_count,
1312 kv_log_store_buffer_unconsumed_epoch_count,
1313 kv_log_store_buffer_unconsumed_min_epoch,
1314 sync_kv_log_store_read_count,
1315 sync_kv_log_store_read_size,
1316 sync_kv_log_store_write_pause_duration_ns,
1317 sync_kv_log_store_state,
1318 sync_kv_log_store_wait_next_poll_ns,
1319 sync_kv_log_store_storage_write_count,
1320 sync_kv_log_store_storage_write_size,
1321 sync_kv_log_store_buffer_unconsumed_item_count,
1322 sync_kv_log_store_buffer_unconsumed_row_count,
1323 sync_kv_log_store_buffer_unconsumed_epoch_count,
1324 sync_kv_log_store_buffer_unconsumed_min_epoch,
1325 lru_runtime_loop_count,
1326 lru_latest_sequence,
1327 lru_watermark_sequence,
1328 lru_eviction_policy,
1329 jemalloc_allocated_bytes,
1330 jemalloc_active_bytes,
1331 jemalloc_resident_bytes,
1332 jemalloc_metadata_bytes,
1333 jvm_allocated_bytes,
1334 jvm_active_bytes,
1335 stream_memory_usage,
1336 materialize_cache_hit_count,
1337 materialize_data_exist_count,
1338 materialize_cache_total_count,
1339 materialize_input_row_count,
1340 materialize_current_epoch,
1341 pg_cdc_state_table_lsn,
1342 pg_cdc_jni_commit_offset_lsn,
1343 gap_fill_generated_rows_count,
1344 }
1345 }
1346
1347 pub fn unused() -> Self {
1349 global_streaming_metrics(MetricLevel::Disabled)
1350 }
1351
1352 pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1353 let label_list: &[&str; 1] = &[&actor_id.to_string()];
1354 let actor_execution_time = self
1355 .actor_execution_time
1356 .with_guarded_label_values(label_list);
1357 let actor_scheduled_duration = self
1358 .actor_scheduled_duration
1359 .with_guarded_label_values(label_list);
1360 let actor_scheduled_cnt = self
1361 .actor_scheduled_cnt
1362 .with_guarded_label_values(label_list);
1363 let actor_fast_poll_duration = self
1364 .actor_fast_poll_duration
1365 .with_guarded_label_values(label_list);
1366 let actor_fast_poll_cnt = self
1367 .actor_fast_poll_cnt
1368 .with_guarded_label_values(label_list);
1369 let actor_slow_poll_duration = self
1370 .actor_slow_poll_duration
1371 .with_guarded_label_values(label_list);
1372 let actor_slow_poll_cnt = self
1373 .actor_slow_poll_cnt
1374 .with_guarded_label_values(label_list);
1375 let actor_poll_duration = self
1376 .actor_poll_duration
1377 .with_guarded_label_values(label_list);
1378 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1379 let actor_idle_duration = self
1380 .actor_idle_duration
1381 .with_guarded_label_values(label_list);
1382 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1383 ActorMetrics {
1384 actor_execution_time,
1385 actor_scheduled_duration,
1386 actor_scheduled_cnt,
1387 actor_fast_poll_duration,
1388 actor_fast_poll_cnt,
1389 actor_slow_poll_duration,
1390 actor_slow_poll_cnt,
1391 actor_poll_duration,
1392 actor_poll_cnt,
1393 actor_idle_duration,
1394 actor_idle_cnt,
1395 }
1396 }
1397
1398 pub(crate) fn new_actor_input_metrics(
1399 &self,
1400 actor_id: ActorId,
1401 fragment_id: FragmentId,
1402 upstream_fragment_id: FragmentId,
1403 ) -> ActorInputMetrics {
1404 let actor_id_str = actor_id.to_string();
1405 let fragment_id_str = fragment_id.to_string();
1406 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1407 ActorInputMetrics {
1408 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1409 &actor_id_str,
1410 &fragment_id_str,
1411 &upstream_fragment_id_str,
1412 ]),
1413 actor_input_buffer_blocking_duration_ns: self
1414 .actor_input_buffer_blocking_duration_ns
1415 .with_guarded_label_values(&[
1416 &actor_id_str,
1417 &fragment_id_str,
1418 &upstream_fragment_id_str,
1419 ]),
1420 }
1421 }
1422
1423 pub fn new_sink_exec_metrics(
1424 &self,
1425 id: SinkId,
1426 actor_id: ActorId,
1427 fragment_id: FragmentId,
1428 ) -> SinkExecutorMetrics {
1429 let label_list: &[&str; 3] = &[
1430 &id.to_string(),
1431 &actor_id.to_string(),
1432 &fragment_id.to_string(),
1433 ];
1434 SinkExecutorMetrics {
1435 sink_input_row_count: self
1436 .sink_input_row_count
1437 .with_guarded_label_values(label_list),
1438 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1439 sink_chunk_buffer_size: self
1440 .sink_chunk_buffer_size
1441 .with_guarded_label_values(label_list),
1442 }
1443 }
1444
1445 pub fn new_group_top_n_metrics(
1446 &self,
1447 table_id: TableId,
1448 actor_id: ActorId,
1449 fragment_id: FragmentId,
1450 ) -> GroupTopNMetrics {
1451 let label_list: &[&str; 3] = &[
1452 &table_id.to_string(),
1453 &actor_id.to_string(),
1454 &fragment_id.to_string(),
1455 ];
1456
1457 GroupTopNMetrics {
1458 group_top_n_cache_miss_count: self
1459 .group_top_n_cache_miss_count
1460 .with_guarded_label_values(label_list),
1461 group_top_n_total_query_cache_count: self
1462 .group_top_n_total_query_cache_count
1463 .with_guarded_label_values(label_list),
1464 group_top_n_cached_entry_count: self
1465 .group_top_n_cached_entry_count
1466 .with_guarded_label_values(label_list),
1467 }
1468 }
1469
1470 pub fn new_append_only_group_top_n_metrics(
1471 &self,
1472 table_id: TableId,
1473 actor_id: ActorId,
1474 fragment_id: FragmentId,
1475 ) -> GroupTopNMetrics {
1476 let label_list: &[&str; 3] = &[
1477 &table_id.to_string(),
1478 &actor_id.to_string(),
1479 &fragment_id.to_string(),
1480 ];
1481
1482 GroupTopNMetrics {
1483 group_top_n_cache_miss_count: self
1484 .group_top_n_appendonly_cache_miss_count
1485 .with_guarded_label_values(label_list),
1486 group_top_n_total_query_cache_count: self
1487 .group_top_n_appendonly_total_query_cache_count
1488 .with_guarded_label_values(label_list),
1489 group_top_n_cached_entry_count: self
1490 .group_top_n_appendonly_cached_entry_count
1491 .with_guarded_label_values(label_list),
1492 }
1493 }
1494
1495 pub fn new_lookup_executor_metrics(
1496 &self,
1497 table_id: TableId,
1498 actor_id: ActorId,
1499 fragment_id: FragmentId,
1500 ) -> LookupExecutorMetrics {
1501 let label_list: &[&str; 3] = &[
1502 &table_id.to_string(),
1503 &actor_id.to_string(),
1504 &fragment_id.to_string(),
1505 ];
1506
1507 LookupExecutorMetrics {
1508 lookup_cache_miss_count: self
1509 .lookup_cache_miss_count
1510 .with_guarded_label_values(label_list),
1511 lookup_total_query_cache_count: self
1512 .lookup_total_query_cache_count
1513 .with_guarded_label_values(label_list),
1514 lookup_cached_entry_count: self
1515 .lookup_cached_entry_count
1516 .with_guarded_label_values(label_list),
1517 }
1518 }
1519
1520 pub fn new_hash_agg_metrics(
1521 &self,
1522 table_id: TableId,
1523 actor_id: ActorId,
1524 fragment_id: FragmentId,
1525 ) -> HashAggMetrics {
1526 let label_list: &[&str; 3] = &[
1527 &table_id.to_string(),
1528 &actor_id.to_string(),
1529 &fragment_id.to_string(),
1530 ];
1531 HashAggMetrics {
1532 agg_lookup_miss_count: self
1533 .agg_lookup_miss_count
1534 .with_guarded_label_values(label_list),
1535 agg_total_lookup_count: self
1536 .agg_total_lookup_count
1537 .with_guarded_label_values(label_list),
1538 agg_cached_entry_count: self
1539 .agg_cached_entry_count
1540 .with_guarded_label_values(label_list),
1541 agg_chunk_lookup_miss_count: self
1542 .agg_chunk_lookup_miss_count
1543 .with_guarded_label_values(label_list),
1544 agg_chunk_total_lookup_count: self
1545 .agg_chunk_total_lookup_count
1546 .with_guarded_label_values(label_list),
1547 agg_dirty_groups_count: self
1548 .agg_dirty_groups_count
1549 .with_guarded_label_values(label_list),
1550 agg_dirty_groups_heap_size: self
1551 .agg_dirty_groups_heap_size
1552 .with_guarded_label_values(label_list),
1553 agg_state_cache_lookup_count: self
1554 .agg_state_cache_lookup_count
1555 .with_guarded_label_values(label_list),
1556 agg_state_cache_miss_count: self
1557 .agg_state_cache_miss_count
1558 .with_guarded_label_values(label_list),
1559 }
1560 }
1561
1562 pub fn new_agg_distinct_dedup_metrics(
1563 &self,
1564 table_id: TableId,
1565 actor_id: ActorId,
1566 fragment_id: FragmentId,
1567 ) -> AggDistinctDedupMetrics {
1568 let label_list: &[&str; 3] = &[
1569 &table_id.to_string(),
1570 &actor_id.to_string(),
1571 &fragment_id.to_string(),
1572 ];
1573 AggDistinctDedupMetrics {
1574 agg_distinct_cache_miss_count: self
1575 .agg_distinct_cache_miss_count
1576 .with_guarded_label_values(label_list),
1577 agg_distinct_total_cache_count: self
1578 .agg_distinct_total_cache_count
1579 .with_guarded_label_values(label_list),
1580 agg_distinct_cached_entry_count: self
1581 .agg_distinct_cached_entry_count
1582 .with_guarded_label_values(label_list),
1583 }
1584 }
1585
1586 pub fn new_temporal_join_metrics(
1587 &self,
1588 table_id: TableId,
1589 actor_id: ActorId,
1590 fragment_id: FragmentId,
1591 ) -> TemporalJoinMetrics {
1592 let label_list: &[&str; 3] = &[
1593 &table_id.to_string(),
1594 &actor_id.to_string(),
1595 &fragment_id.to_string(),
1596 ];
1597 TemporalJoinMetrics {
1598 temporal_join_cache_miss_count: self
1599 .temporal_join_cache_miss_count
1600 .with_guarded_label_values(label_list),
1601 temporal_join_total_query_cache_count: self
1602 .temporal_join_total_query_cache_count
1603 .with_guarded_label_values(label_list),
1604 temporal_join_cached_entry_count: self
1605 .temporal_join_cached_entry_count
1606 .with_guarded_label_values(label_list),
1607 }
1608 }
1609
1610 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1611 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1612 BackfillMetrics {
1613 backfill_snapshot_read_row_count: self
1614 .backfill_snapshot_read_row_count
1615 .with_guarded_label_values(label_list),
1616 backfill_upstream_output_row_count: self
1617 .backfill_upstream_output_row_count
1618 .with_guarded_label_values(label_list),
1619 }
1620 }
1621
1622 pub fn new_cdc_backfill_metrics(
1623 &self,
1624 table_id: TableId,
1625 actor_id: ActorId,
1626 ) -> CdcBackfillMetrics {
1627 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1628 CdcBackfillMetrics {
1629 cdc_backfill_snapshot_read_row_count: self
1630 .cdc_backfill_snapshot_read_row_count
1631 .with_guarded_label_values(label_list),
1632 cdc_backfill_upstream_output_row_count: self
1633 .cdc_backfill_upstream_output_row_count
1634 .with_guarded_label_values(label_list),
1635 }
1636 }
1637
1638 pub fn new_over_window_metrics(
1639 &self,
1640 table_id: TableId,
1641 actor_id: ActorId,
1642 fragment_id: FragmentId,
1643 ) -> OverWindowMetrics {
1644 let label_list: &[&str; 3] = &[
1645 &table_id.to_string(),
1646 &actor_id.to_string(),
1647 &fragment_id.to_string(),
1648 ];
1649 OverWindowMetrics {
1650 over_window_cached_entry_count: self
1651 .over_window_cached_entry_count
1652 .with_guarded_label_values(label_list),
1653 over_window_cache_lookup_count: self
1654 .over_window_cache_lookup_count
1655 .with_guarded_label_values(label_list),
1656 over_window_cache_miss_count: self
1657 .over_window_cache_miss_count
1658 .with_guarded_label_values(label_list),
1659 over_window_range_cache_entry_count: self
1660 .over_window_range_cache_entry_count
1661 .with_guarded_label_values(label_list),
1662 over_window_range_cache_lookup_count: self
1663 .over_window_range_cache_lookup_count
1664 .with_guarded_label_values(label_list),
1665 over_window_range_cache_left_miss_count: self
1666 .over_window_range_cache_left_miss_count
1667 .with_guarded_label_values(label_list),
1668 over_window_range_cache_right_miss_count: self
1669 .over_window_range_cache_right_miss_count
1670 .with_guarded_label_values(label_list),
1671 over_window_accessed_entry_count: self
1672 .over_window_accessed_entry_count
1673 .with_guarded_label_values(label_list),
1674 over_window_compute_count: self
1675 .over_window_compute_count
1676 .with_guarded_label_values(label_list),
1677 over_window_same_output_count: self
1678 .over_window_same_output_count
1679 .with_guarded_label_values(label_list),
1680 }
1681 }
1682
1683 pub fn new_materialize_metrics(
1684 &self,
1685 table_id: TableId,
1686 actor_id: ActorId,
1687 fragment_id: FragmentId,
1688 ) -> MaterializeMetrics {
1689 let label_list: &[&str; 3] = &[
1690 &actor_id.to_string(),
1691 &table_id.to_string(),
1692 &fragment_id.to_string(),
1693 ];
1694 MaterializeMetrics {
1695 materialize_cache_hit_count: self
1696 .materialize_cache_hit_count
1697 .with_guarded_label_values(label_list),
1698 materialize_data_exist_count: self
1699 .materialize_data_exist_count
1700 .with_guarded_label_values(label_list),
1701 materialize_cache_total_count: self
1702 .materialize_cache_total_count
1703 .with_guarded_label_values(label_list),
1704 materialize_input_row_count: self
1705 .materialize_input_row_count
1706 .with_guarded_label_values(label_list),
1707 materialize_current_epoch: self
1708 .materialize_current_epoch
1709 .with_guarded_label_values(label_list),
1710 }
1711 }
1712}
1713
1714pub(crate) struct ActorInputMetrics {
1715 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1716 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1717}
1718
1719pub struct ActorMetrics {
1721 pub actor_execution_time: LabelGuardedGauge,
1722 pub actor_scheduled_duration: LabelGuardedGauge,
1723 pub actor_scheduled_cnt: LabelGuardedIntGauge,
1724 pub actor_fast_poll_duration: LabelGuardedGauge,
1725 pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1726 pub actor_slow_poll_duration: LabelGuardedGauge,
1727 pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1728 pub actor_poll_duration: LabelGuardedGauge,
1729 pub actor_poll_cnt: LabelGuardedIntGauge,
1730 pub actor_idle_duration: LabelGuardedGauge,
1731 pub actor_idle_cnt: LabelGuardedIntGauge,
1732}
1733
1734pub struct SinkExecutorMetrics {
1735 pub sink_input_row_count: LabelGuardedIntCounter,
1736 pub sink_input_bytes: LabelGuardedIntCounter,
1737 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1738}
1739
1740pub struct MaterializeMetrics {
1741 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1742 pub materialize_data_exist_count: LabelGuardedIntCounter,
1743 pub materialize_cache_total_count: LabelGuardedIntCounter,
1744 pub materialize_input_row_count: LabelGuardedIntCounter,
1745 pub materialize_current_epoch: LabelGuardedIntGauge,
1746}
1747
1748pub struct GroupTopNMetrics {
1749 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1750 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1751 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1752}
1753
1754pub struct LookupExecutorMetrics {
1755 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1756 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1757 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1758}
1759
1760pub struct HashAggMetrics {
1761 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1762 pub agg_total_lookup_count: LabelGuardedIntCounter,
1763 pub agg_cached_entry_count: LabelGuardedIntGauge,
1764 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1765 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1766 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1767 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1768 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1769 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1770}
1771
1772pub struct AggDistinctDedupMetrics {
1773 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1774 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1775 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1776}
1777
1778pub struct TemporalJoinMetrics {
1779 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1780 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1781 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1782}
1783
1784pub struct BackfillMetrics {
1785 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1786 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1787}
1788
1789#[derive(Clone)]
1790pub struct CdcBackfillMetrics {
1791 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1792 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1793}
1794
1795pub struct OverWindowMetrics {
1796 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1797 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1798 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1799 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1800 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1801 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1802 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1803 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1804 pub over_window_compute_count: LabelGuardedIntCounter,
1805 pub over_window_same_output_count: LabelGuardedIntCounter,
1806}