1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33 register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45 pub level: MetricLevel,
46
47 pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50 pub mem_stream_node_output_row_count: CountMap,
54 pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56 actor_execution_time: LabelGuardedGaugeVec,
58 actor_scheduled_duration: LabelGuardedGaugeVec,
59 actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60 actor_fast_poll_duration: LabelGuardedGaugeVec,
61 actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62 actor_slow_poll_duration: LabelGuardedGaugeVec,
63 actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64 actor_poll_duration: LabelGuardedGaugeVec,
65 actor_poll_cnt: LabelGuardedIntGaugeVec,
66 actor_idle_duration: LabelGuardedGaugeVec,
67 actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69 pub actor_count: LabelGuardedIntGaugeVec,
71 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75 pub source_output_row_count: LabelGuardedIntCounterVec,
77 pub source_split_change_count: LabelGuardedIntCounterVec,
78 pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80 sink_input_row_count: LabelGuardedIntCounterVec,
82 sink_input_bytes: LabelGuardedIntCounterVec,
83 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88 pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94 actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec,
95
96 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98 pub join_lookup_total_count: LabelGuardedIntCounterVec,
99 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101 pub join_match_duration_ns: LabelGuardedIntCounterVec,
102 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108 agg_lookup_miss_count: LabelGuardedIntCounterVec,
110 agg_total_lookup_count: LabelGuardedIntCounterVec,
111 agg_cached_entry_count: LabelGuardedIntGaugeVec,
112 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131 lookup_cache_miss_count: LabelGuardedIntCounterVec,
133 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155 over_window_cache_miss_count: LabelGuardedIntCounterVec,
156 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161 over_window_compute_count: LabelGuardedIntCounterVec,
162 over_window_same_output_count: LabelGuardedIntCounterVec,
163
164 pub barrier_inflight_latency: Histogram,
168 pub barrier_sync_latency: Histogram,
170 pub barrier_batch_size: Histogram,
171 pub barrier_manager_progress: IntCounter,
173
174 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197 pub lru_runtime_loop_count: IntCounter,
199 pub lru_latest_sequence: IntGauge,
200 pub lru_watermark_sequence: IntGauge,
201 pub lru_eviction_policy: IntGauge,
202 pub jemalloc_allocated_bytes: IntGauge,
203 pub jemalloc_active_bytes: IntGauge,
204 pub jemalloc_resident_bytes: IntGauge,
205 pub jemalloc_metadata_bytes: IntGauge,
206 pub jvm_allocated_bytes: IntGauge,
207 pub jvm_active_bytes: IntGauge,
208 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214 materialize_input_row_count: RelabeledGuardedIntCounterVec,
215 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220}
221
222pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
223
224pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
225 GLOBAL_STREAMING_METRICS
226 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
227 .clone()
228}
229
230impl StreamingMetrics {
231 fn new(registry: &Registry, level: MetricLevel) -> Self {
232 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
233 "stream_executor_row_count",
234 "Total number of rows that have been output from each executor",
235 &["actor_id", "fragment_id", "executor_identity"],
236 registry
237 )
238 .unwrap()
239 .relabel_debug_1(level);
240
241 let stream_node_output_row_count = CountMap::new();
242 let stream_node_output_blocking_duration_ns = CountMap::new();
243
244 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
245 "stream_source_output_rows_counts",
246 "Total number of rows that have been output from source",
247 &["source_id", "source_name", "actor_id", "fragment_id"],
248 registry
249 )
250 .unwrap();
251
252 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
253 "stream_source_split_change_event_count",
254 "Total number of split change events that have been operated by source",
255 &["source_id", "source_name", "actor_id", "fragment_id"],
256 registry
257 )
258 .unwrap();
259
260 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
261 "stream_source_backfill_rows_counts",
262 "Total number of rows that have been backfilled for source",
263 &["source_id", "source_name", "actor_id", "fragment_id"],
264 registry
265 )
266 .unwrap();
267
268 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
269 "stream_sink_input_row_count",
270 "Total number of rows streamed into sink executors",
271 &["sink_id", "actor_id", "fragment_id"],
272 registry
273 )
274 .unwrap();
275
276 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
277 "stream_sink_input_bytes",
278 "Total size of chunks streamed into sink executors",
279 &["sink_id", "actor_id", "fragment_id"],
280 registry
281 )
282 .unwrap();
283
284 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
285 "stream_mview_input_row_count",
286 "Total number of rows streamed into materialize executors",
287 &["actor_id", "table_id", "fragment_id"],
288 registry
289 )
290 .unwrap()
291 .relabel_debug_1(level);
292
293 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
294 "stream_mview_current_epoch",
295 "The current epoch of the materialized executor",
296 &["actor_id", "table_id", "fragment_id"],
297 registry
298 )
299 .unwrap()
300 .relabel_debug_1(level);
301
302 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
303 "stream_pg_cdc_state_table_lsn",
304 "Current LSN value stored in PostgreSQL CDC state table",
305 &["source_id"],
306 registry,
307 )
308 .unwrap();
309
310 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
311 "stream_pg_cdc_jni_commit_offset_lsn",
312 "LSN value when JNI commit offset is called for PostgreSQL CDC",
313 &["source_id"],
314 registry,
315 )
316 .unwrap();
317
318 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
319 "stream_sink_chunk_buffer_size",
320 "Total size of chunks buffered in a barrier",
321 &["sink_id", "actor_id", "fragment_id"],
322 registry
323 )
324 .unwrap();
325
326 let actor_execution_time = register_guarded_gauge_vec_with_registry!(
327 "stream_actor_actor_execution_time",
328 "Total execution time (s) of an actor",
329 &["actor_id"],
330 registry
331 )
332 .unwrap();
333
334 let actor_output_buffer_blocking_duration_ns =
335 register_guarded_int_counter_vec_with_registry!(
336 "stream_actor_output_buffer_blocking_duration_ns",
337 "Total blocking duration (ns) of output buffer",
338 &["actor_id", "fragment_id", "downstream_fragment_id"],
339 registry
340 )
341 .unwrap()
342 .relabel_debug_1(level);
344
345 let actor_input_buffer_blocking_duration_ns =
346 register_guarded_int_counter_vec_with_registry!(
347 "stream_actor_input_buffer_blocking_duration_ns",
348 "Total blocking duration (ns) of input buffer",
349 &["actor_id", "fragment_id", "upstream_fragment_id"],
350 registry
351 )
352 .unwrap();
353
354 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
355 "stream_exchange_frag_recv_size",
356 "Total size of messages that have been received from upstream Fragment",
357 &["up_fragment_id", "down_fragment_id"],
358 registry
359 )
360 .unwrap();
361
362 let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
363 "stream_actor_fast_poll_duration",
364 "tokio's metrics",
365 &["actor_id"],
366 registry
367 )
368 .unwrap();
369
370 let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
371 "stream_actor_fast_poll_cnt",
372 "tokio's metrics",
373 &["actor_id"],
374 registry
375 )
376 .unwrap();
377
378 let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
379 "stream_actor_slow_poll_duration",
380 "tokio's metrics",
381 &["actor_id"],
382 registry
383 )
384 .unwrap();
385
386 let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
387 "stream_actor_slow_poll_cnt",
388 "tokio's metrics",
389 &["actor_id"],
390 registry
391 )
392 .unwrap();
393
394 let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
395 "stream_actor_poll_duration",
396 "tokio's metrics",
397 &["actor_id"],
398 registry
399 )
400 .unwrap();
401
402 let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
403 "stream_actor_poll_cnt",
404 "tokio's metrics",
405 &["actor_id"],
406 registry
407 )
408 .unwrap();
409
410 let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
411 "stream_actor_scheduled_duration",
412 "tokio's metrics",
413 &["actor_id"],
414 registry
415 )
416 .unwrap();
417
418 let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
419 "stream_actor_scheduled_cnt",
420 "tokio's metrics",
421 &["actor_id"],
422 registry
423 )
424 .unwrap();
425
426 let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
427 "stream_actor_idle_duration",
428 "tokio's metrics",
429 &["actor_id"],
430 registry
431 )
432 .unwrap();
433
434 let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
435 "stream_actor_idle_cnt",
436 "tokio's metrics",
437 &["actor_id"],
438 registry
439 )
440 .unwrap();
441
442 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
443 "stream_actor_in_record_cnt",
444 "Total number of rows actor received",
445 &["actor_id", "fragment_id", "upstream_fragment_id"],
446 registry
447 )
448 .unwrap()
449 .relabel_debug_1(level);
450
451 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
452 "stream_actor_out_record_cnt",
453 "Total number of rows actor sent",
454 &["actor_id", "fragment_id"],
455 registry
456 )
457 .unwrap()
458 .relabel_debug_1(level);
459
460 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
461 "stream_actor_current_epoch",
462 "Current epoch of actor",
463 &["actor_id", "fragment_id"],
464 registry
465 )
466 .unwrap()
467 .relabel_debug_1(level);
468
469 let actor_count = register_guarded_int_gauge_vec_with_registry!(
470 "stream_actor_count",
471 "Total number of actors (parallelism)",
472 &["fragment_id"],
473 registry
474 )
475 .unwrap();
476
477 let opts = histogram_opts!(
478 "stream_merge_barrier_align_duration",
479 "Duration of merge align barrier",
480 exponential_buckets(0.0001, 2.0, 21).unwrap() );
482 let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
483 opts,
484 &["actor_id", "fragment_id"],
485 registry
486 )
487 .unwrap()
488 .relabel_debug_1(level);
489
490 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
491 "stream_join_lookup_miss_count",
492 "Join executor lookup miss duration",
493 &["side", "join_table_id", "actor_id", "fragment_id"],
494 registry
495 )
496 .unwrap();
497
498 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
499 "stream_join_lookup_total_count",
500 "Join executor lookup total operation",
501 &["side", "join_table_id", "actor_id", "fragment_id"],
502 registry
503 )
504 .unwrap();
505
506 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
507 "stream_join_insert_cache_miss_count",
508 "Join executor cache miss when insert operation",
509 &["side", "join_table_id", "actor_id", "fragment_id"],
510 registry
511 )
512 .unwrap();
513
514 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
515 "stream_join_actor_input_waiting_duration_ns",
516 "Total waiting duration (ns) of input buffer of join actor",
517 &["actor_id", "fragment_id"],
518 registry
519 )
520 .unwrap();
521
522 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
523 "stream_join_match_duration_ns",
524 "Matching duration for each side",
525 &["actor_id", "fragment_id", "side"],
526 registry
527 )
528 .unwrap();
529
530 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
531 "stream_barrier_align_duration_ns",
532 "Duration of join align barrier",
533 &["actor_id", "fragment_id", "wait_side", "executor"],
534 registry
535 )
536 .unwrap()
537 .relabel_debug_1(level);
538
539 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
540 "stream_join_cached_entry_count",
541 "Number of cached entries in streaming join operators",
542 &["actor_id", "fragment_id", "side"],
543 registry
544 )
545 .unwrap();
546
547 let join_matched_join_keys_opts = histogram_opts!(
548 "stream_join_matched_join_keys",
549 "The number of keys matched in the opposite side",
550 exponential_buckets(16.0, 2.0, 28).unwrap() );
552
553 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
554 join_matched_join_keys_opts,
555 &["actor_id", "fragment_id", "table_id"],
556 registry
557 )
558 .unwrap()
559 .relabel_debug_1(level);
560
561 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
562 "stream_agg_lookup_miss_count",
563 "Aggregation executor lookup miss duration",
564 &["table_id", "actor_id", "fragment_id"],
565 registry
566 )
567 .unwrap();
568
569 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
570 "stream_agg_lookup_total_count",
571 "Aggregation executor lookup total operation",
572 &["table_id", "actor_id", "fragment_id"],
573 registry
574 )
575 .unwrap();
576
577 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
578 "stream_agg_distinct_cache_miss_count",
579 "Aggregation executor dinsinct miss duration",
580 &["table_id", "actor_id", "fragment_id"],
581 registry
582 )
583 .unwrap();
584
585 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
586 "stream_agg_distinct_total_cache_count",
587 "Aggregation executor distinct total operation",
588 &["table_id", "actor_id", "fragment_id"],
589 registry
590 )
591 .unwrap();
592
593 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
594 "stream_agg_distinct_cached_entry_count",
595 "Total entry counts in distinct aggregation executor cache",
596 &["table_id", "actor_id", "fragment_id"],
597 registry
598 )
599 .unwrap();
600
601 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
602 "stream_agg_dirty_groups_count",
603 "Total dirty group counts in aggregation executor",
604 &["table_id", "actor_id", "fragment_id"],
605 registry
606 )
607 .unwrap();
608
609 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
610 "stream_agg_dirty_groups_heap_size",
611 "Total dirty group heap size in aggregation executor",
612 &["table_id", "actor_id", "fragment_id"],
613 registry
614 )
615 .unwrap();
616
617 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
618 "stream_agg_state_cache_lookup_count",
619 "Aggregation executor state cache lookup count",
620 &["table_id", "actor_id", "fragment_id"],
621 registry
622 )
623 .unwrap();
624
625 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
626 "stream_agg_state_cache_miss_count",
627 "Aggregation executor state cache miss count",
628 &["table_id", "actor_id", "fragment_id"],
629 registry
630 )
631 .unwrap();
632
633 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
634 "stream_group_top_n_cache_miss_count",
635 "Group top n executor cache miss count",
636 &["table_id", "actor_id", "fragment_id"],
637 registry
638 )
639 .unwrap();
640
641 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
642 "stream_group_top_n_total_query_cache_count",
643 "Group top n executor query cache total count",
644 &["table_id", "actor_id", "fragment_id"],
645 registry
646 )
647 .unwrap();
648
649 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
650 "stream_group_top_n_cached_entry_count",
651 "Total entry counts in group top n executor cache",
652 &["table_id", "actor_id", "fragment_id"],
653 registry
654 )
655 .unwrap();
656
657 let group_top_n_appendonly_cache_miss_count =
658 register_guarded_int_counter_vec_with_registry!(
659 "stream_group_top_n_appendonly_cache_miss_count",
660 "Group top n appendonly executor cache miss count",
661 &["table_id", "actor_id", "fragment_id"],
662 registry
663 )
664 .unwrap();
665
666 let group_top_n_appendonly_total_query_cache_count =
667 register_guarded_int_counter_vec_with_registry!(
668 "stream_group_top_n_appendonly_total_query_cache_count",
669 "Group top n appendonly executor total cache count",
670 &["table_id", "actor_id", "fragment_id"],
671 registry
672 )
673 .unwrap();
674
675 let group_top_n_appendonly_cached_entry_count =
676 register_guarded_int_gauge_vec_with_registry!(
677 "stream_group_top_n_appendonly_cached_entry_count",
678 "Total entry counts in group top n appendonly executor cache",
679 &["table_id", "actor_id", "fragment_id"],
680 registry
681 )
682 .unwrap();
683
684 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
685 "stream_lookup_cache_miss_count",
686 "Lookup executor cache miss count",
687 &["table_id", "actor_id", "fragment_id"],
688 registry
689 )
690 .unwrap();
691
692 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
693 "stream_lookup_total_query_cache_count",
694 "Lookup executor query cache total count",
695 &["table_id", "actor_id", "fragment_id"],
696 registry
697 )
698 .unwrap();
699
700 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
701 "stream_lookup_cached_entry_count",
702 "Total entry counts in lookup executor cache",
703 &["table_id", "actor_id", "fragment_id"],
704 registry
705 )
706 .unwrap();
707
708 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
709 "stream_temporal_join_cache_miss_count",
710 "Temporal join executor cache miss count",
711 &["table_id", "actor_id", "fragment_id"],
712 registry
713 )
714 .unwrap();
715
716 let temporal_join_total_query_cache_count =
717 register_guarded_int_counter_vec_with_registry!(
718 "stream_temporal_join_total_query_cache_count",
719 "Temporal join executor query cache total count",
720 &["table_id", "actor_id", "fragment_id"],
721 registry
722 )
723 .unwrap();
724
725 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
726 "stream_temporal_join_cached_entry_count",
727 "Total entry count in temporal join executor cache",
728 &["table_id", "actor_id", "fragment_id"],
729 registry
730 )
731 .unwrap();
732
733 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
734 "stream_agg_cached_entry_count",
735 "Number of cached keys in streaming aggregation operators",
736 &["table_id", "actor_id", "fragment_id"],
737 registry
738 )
739 .unwrap();
740
741 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
742 "stream_agg_chunk_lookup_miss_count",
743 "Aggregation executor chunk-level lookup miss duration",
744 &["table_id", "actor_id", "fragment_id"],
745 registry
746 )
747 .unwrap();
748
749 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
750 "stream_agg_chunk_lookup_total_count",
751 "Aggregation executor chunk-level lookup total operation",
752 &["table_id", "actor_id", "fragment_id"],
753 registry
754 )
755 .unwrap();
756
757 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
758 "stream_backfill_snapshot_read_row_count",
759 "Total number of rows that have been read from the backfill snapshot",
760 &["table_id", "actor_id"],
761 registry
762 )
763 .unwrap();
764
765 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
766 "stream_backfill_upstream_output_row_count",
767 "Total number of rows that have been output from the backfill upstream",
768 &["table_id", "actor_id"],
769 registry
770 )
771 .unwrap();
772
773 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
774 "stream_cdc_backfill_snapshot_read_row_count",
775 "Total number of rows that have been read from the cdc_backfill snapshot",
776 &["table_id", "actor_id"],
777 registry
778 )
779 .unwrap();
780
781 let cdc_backfill_upstream_output_row_count =
782 register_guarded_int_counter_vec_with_registry!(
783 "stream_cdc_backfill_upstream_output_row_count",
784 "Total number of rows that have been output from the cdc_backfill upstream",
785 &["table_id", "actor_id"],
786 registry
787 )
788 .unwrap();
789
790 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
791 "stream_snapshot_backfill_consume_snapshot_row_count",
792 "Total number of rows that have been output from snapshot backfill",
793 &["table_id", "actor_id", "stage"],
794 registry
795 )
796 .unwrap();
797
798 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
799 "stream_over_window_cached_entry_count",
800 "Total entry (partition) count in over window executor cache",
801 &["table_id", "actor_id", "fragment_id"],
802 registry
803 )
804 .unwrap();
805
806 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
807 "stream_over_window_cache_lookup_count",
808 "Over window executor cache lookup count",
809 &["table_id", "actor_id", "fragment_id"],
810 registry
811 )
812 .unwrap();
813
814 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
815 "stream_over_window_cache_miss_count",
816 "Over window executor cache miss count",
817 &["table_id", "actor_id", "fragment_id"],
818 registry
819 )
820 .unwrap();
821
822 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
823 "stream_over_window_range_cache_entry_count",
824 "Over window partition range cache entry count",
825 &["table_id", "actor_id", "fragment_id"],
826 registry,
827 )
828 .unwrap();
829
830 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
831 "stream_over_window_range_cache_lookup_count",
832 "Over window partition range cache lookup count",
833 &["table_id", "actor_id", "fragment_id"],
834 registry
835 )
836 .unwrap();
837
838 let over_window_range_cache_left_miss_count =
839 register_guarded_int_counter_vec_with_registry!(
840 "stream_over_window_range_cache_left_miss_count",
841 "Over window partition range cache left miss count",
842 &["table_id", "actor_id", "fragment_id"],
843 registry
844 )
845 .unwrap();
846
847 let over_window_range_cache_right_miss_count =
848 register_guarded_int_counter_vec_with_registry!(
849 "stream_over_window_range_cache_right_miss_count",
850 "Over window partition range cache right miss count",
851 &["table_id", "actor_id", "fragment_id"],
852 registry
853 )
854 .unwrap();
855
856 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
857 "stream_over_window_accessed_entry_count",
858 "Over window accessed entry count",
859 &["table_id", "actor_id", "fragment_id"],
860 registry
861 )
862 .unwrap();
863
864 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
865 "stream_over_window_compute_count",
866 "Over window compute count",
867 &["table_id", "actor_id", "fragment_id"],
868 registry
869 )
870 .unwrap();
871
872 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
873 "stream_over_window_same_output_count",
874 "Over window same output count",
875 &["table_id", "actor_id", "fragment_id"],
876 registry
877 )
878 .unwrap();
879
880 let opts = histogram_opts!(
881 "stream_barrier_inflight_duration_seconds",
882 "barrier_inflight_latency",
883 exponential_buckets(0.1, 1.5, 16).unwrap() );
885 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
886
887 let opts = histogram_opts!(
888 "stream_barrier_sync_storage_duration_seconds",
889 "barrier_sync_latency",
890 exponential_buckets(0.1, 1.5, 16).unwrap() );
892 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
893
894 let opts = histogram_opts!(
895 "stream_barrier_batch_size",
896 "barrier_batch_size",
897 exponential_buckets(1.0, 2.0, 8).unwrap()
898 );
899 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
900
901 let barrier_manager_progress = register_int_counter_with_registry!(
902 "stream_barrier_manager_progress",
903 "The number of actors that have processed the earliest in-flight barriers",
904 registry
905 )
906 .unwrap();
907
908 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
909 "sync_kv_log_store_wait_next_poll_ns",
910 "Total duration (ns) of waiting for next poll",
911 &["actor_id", "target", "fragment_id", "relation"],
912 registry
913 )
914 .unwrap();
915
916 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
917 "sync_kv_log_store_read_count",
918 "read row count throughput of sync_kv log store",
919 &["type", "actor_id", "target", "fragment_id", "relation"],
920 registry
921 )
922 .unwrap();
923
924 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
925 "sync_kv_log_store_read_size",
926 "read size throughput of sync_kv log store",
927 &["type", "actor_id", "target", "fragment_id", "relation"],
928 registry
929 )
930 .unwrap();
931
932 let sync_kv_log_store_write_pause_duration_ns =
933 register_guarded_int_counter_vec_with_registry!(
934 "sync_kv_log_store_write_pause_duration_ns",
935 "Duration (ns) of sync_kv log store write pause",
936 &["actor_id", "target", "fragment_id", "relation"],
937 registry
938 )
939 .unwrap();
940
941 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
942 "sync_kv_log_store_state",
943 "clean/unclean state transition for sync_kv log store",
944 &["state", "actor_id", "target", "fragment_id", "relation"],
945 registry
946 )
947 .unwrap();
948
949 let sync_kv_log_store_storage_write_count =
950 register_guarded_int_counter_vec_with_registry!(
951 "sync_kv_log_store_storage_write_count",
952 "Write row count throughput of sync_kv log store",
953 &["actor_id", "target", "fragment_id", "relation"],
954 registry
955 )
956 .unwrap();
957
958 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
959 "sync_kv_log_store_storage_write_size",
960 "Write size throughput of sync_kv log store",
961 &["actor_id", "target", "fragment_id", "relation"],
962 registry
963 )
964 .unwrap();
965
966 let sync_kv_log_store_buffer_unconsumed_item_count =
967 register_guarded_int_gauge_vec_with_registry!(
968 "sync_kv_log_store_buffer_unconsumed_item_count",
969 "Number of Unconsumed Item in buffer",
970 &["actor_id", "target", "fragment_id", "relation"],
971 registry
972 )
973 .unwrap();
974
975 let sync_kv_log_store_buffer_unconsumed_row_count =
976 register_guarded_int_gauge_vec_with_registry!(
977 "sync_kv_log_store_buffer_unconsumed_row_count",
978 "Number of Unconsumed Row in buffer",
979 &["actor_id", "target", "fragment_id", "relation"],
980 registry
981 )
982 .unwrap();
983
984 let sync_kv_log_store_buffer_unconsumed_epoch_count =
985 register_guarded_int_gauge_vec_with_registry!(
986 "sync_kv_log_store_buffer_unconsumed_epoch_count",
987 "Number of Unconsumed Epoch in buffer",
988 &["actor_id", "target", "fragment_id", "relation"],
989 registry
990 )
991 .unwrap();
992
993 let sync_kv_log_store_buffer_unconsumed_min_epoch =
994 register_guarded_int_gauge_vec_with_registry!(
995 "sync_kv_log_store_buffer_unconsumed_min_epoch",
996 "Number of Unconsumed Epoch in buffer",
997 &["actor_id", "target", "fragment_id", "relation"],
998 registry
999 )
1000 .unwrap();
1001
1002 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1003 "kv_log_store_storage_write_count",
1004 "Write row count throughput of kv log store",
1005 &["actor_id", "connector", "sink_id", "sink_name"],
1006 registry
1007 )
1008 .unwrap();
1009
1010 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1011 "kv_log_store_storage_write_size",
1012 "Write size throughput of kv log store",
1013 &["actor_id", "connector", "sink_id", "sink_name"],
1014 registry
1015 )
1016 .unwrap();
1017
1018 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1019 "kv_log_store_storage_read_count",
1020 "Write row count throughput of kv log store",
1021 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1022 registry
1023 )
1024 .unwrap();
1025
1026 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1027 "kv_log_store_storage_read_size",
1028 "Write size throughput of kv log store",
1029 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1030 registry
1031 )
1032 .unwrap();
1033
1034 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1035 "kv_log_store_rewind_count",
1036 "Kv log store rewind rate",
1037 &["actor_id", "connector", "sink_id", "sink_name"],
1038 registry
1039 )
1040 .unwrap();
1041
1042 let kv_log_store_rewind_delay_opts = {
1043 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1044 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1045 - REWIND_BASE_DELAY.as_secs_f64().log2())
1046 .ceil() as usize;
1047 let buckets = exponential_buckets(
1048 REWIND_BASE_DELAY.as_secs_f64(),
1049 REWIND_BACKOFF_FACTOR as _,
1050 bucket_count,
1051 )
1052 .unwrap();
1053 histogram_opts!(
1054 "kv_log_store_rewind_delay",
1055 "Kv log store rewind delay",
1056 buckets,
1057 )
1058 };
1059
1060 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1061 kv_log_store_rewind_delay_opts,
1062 &["actor_id", "connector", "sink_id", "sink_name"],
1063 registry
1064 )
1065 .unwrap();
1066
1067 let kv_log_store_buffer_unconsumed_item_count =
1068 register_guarded_int_gauge_vec_with_registry!(
1069 "kv_log_store_buffer_unconsumed_item_count",
1070 "Number of Unconsumed Item in buffer",
1071 &["actor_id", "connector", "sink_id", "sink_name"],
1072 registry
1073 )
1074 .unwrap();
1075
1076 let kv_log_store_buffer_unconsumed_row_count =
1077 register_guarded_int_gauge_vec_with_registry!(
1078 "kv_log_store_buffer_unconsumed_row_count",
1079 "Number of Unconsumed Row in buffer",
1080 &["actor_id", "connector", "sink_id", "sink_name"],
1081 registry
1082 )
1083 .unwrap();
1084
1085 let kv_log_store_buffer_unconsumed_epoch_count =
1086 register_guarded_int_gauge_vec_with_registry!(
1087 "kv_log_store_buffer_unconsumed_epoch_count",
1088 "Number of Unconsumed Epoch in buffer",
1089 &["actor_id", "connector", "sink_id", "sink_name"],
1090 registry
1091 )
1092 .unwrap();
1093
1094 let kv_log_store_buffer_unconsumed_min_epoch =
1095 register_guarded_int_gauge_vec_with_registry!(
1096 "kv_log_store_buffer_unconsumed_min_epoch",
1097 "Number of Unconsumed Epoch in buffer",
1098 &["actor_id", "connector", "sink_id", "sink_name"],
1099 registry
1100 )
1101 .unwrap();
1102
1103 let lru_runtime_loop_count = register_int_counter_with_registry!(
1104 "lru_runtime_loop_count",
1105 "The counts of the eviction loop in LRU manager per second",
1106 registry
1107 )
1108 .unwrap();
1109
1110 let lru_latest_sequence = register_int_gauge_with_registry!(
1111 "lru_latest_sequence",
1112 "Current LRU global sequence",
1113 registry,
1114 )
1115 .unwrap();
1116
1117 let lru_watermark_sequence = register_int_gauge_with_registry!(
1118 "lru_watermark_sequence",
1119 "Current LRU watermark sequence",
1120 registry,
1121 )
1122 .unwrap();
1123
1124 let lru_eviction_policy = register_int_gauge_with_registry!(
1125 "lru_eviction_policy",
1126 "Current LRU eviction policy",
1127 registry,
1128 )
1129 .unwrap();
1130
1131 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1132 "jemalloc_allocated_bytes",
1133 "The allocated memory jemalloc, got from jemalloc_ctl",
1134 registry
1135 )
1136 .unwrap();
1137
1138 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1139 "jemalloc_active_bytes",
1140 "The active memory jemalloc, got from jemalloc_ctl",
1141 registry
1142 )
1143 .unwrap();
1144
1145 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1146 "jemalloc_resident_bytes",
1147 "The active memory jemalloc, got from jemalloc_ctl",
1148 registry
1149 )
1150 .unwrap();
1151
1152 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1153 "jemalloc_metadata_bytes",
1154 "The active memory jemalloc, got from jemalloc_ctl",
1155 registry
1156 )
1157 .unwrap();
1158
1159 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1160 "jvm_allocated_bytes",
1161 "The allocated jvm memory",
1162 registry
1163 )
1164 .unwrap();
1165
1166 let jvm_active_bytes = register_int_gauge_with_registry!(
1167 "jvm_active_bytes",
1168 "The active jvm memory",
1169 registry
1170 )
1171 .unwrap();
1172
1173 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1174 "stream_materialize_cache_hit_count",
1175 "Materialize executor cache hit count",
1176 &["actor_id", "table_id", "fragment_id"],
1177 registry
1178 )
1179 .unwrap()
1180 .relabel_debug_1(level);
1181
1182 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1183 "stream_materialize_data_exist_count",
1184 "Materialize executor data exist count",
1185 &["actor_id", "table_id", "fragment_id"],
1186 registry
1187 )
1188 .unwrap()
1189 .relabel_debug_1(level);
1190
1191 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1192 "stream_materialize_cache_total_count",
1193 "Materialize executor cache total operation",
1194 &["actor_id", "table_id", "fragment_id"],
1195 registry
1196 )
1197 .unwrap()
1198 .relabel_debug_1(level);
1199
1200 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1201 "stream_memory_usage",
1202 "Memory usage for stream executors",
1203 &["actor_id", "table_id", "desc"],
1204 registry
1205 )
1206 .unwrap()
1207 .relabel_debug_1(level);
1208
1209 Self {
1210 level,
1211 executor_row_count,
1212 mem_stream_node_output_row_count: stream_node_output_row_count,
1213 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1214 actor_execution_time,
1215 actor_scheduled_duration,
1216 actor_scheduled_cnt,
1217 actor_fast_poll_duration,
1218 actor_fast_poll_cnt,
1219 actor_slow_poll_duration,
1220 actor_slow_poll_cnt,
1221 actor_poll_duration,
1222 actor_poll_cnt,
1223 actor_idle_duration,
1224 actor_idle_cnt,
1225 actor_count,
1226 actor_in_record_cnt,
1227 actor_out_record_cnt,
1228 actor_current_epoch,
1229 source_output_row_count,
1230 source_split_change_count,
1231 source_backfill_row_count,
1232 sink_input_row_count,
1233 sink_input_bytes,
1234 sink_chunk_buffer_size,
1235 exchange_frag_recv_size,
1236 merge_barrier_align_duration,
1237 actor_output_buffer_blocking_duration_ns,
1238 actor_input_buffer_blocking_duration_ns,
1239 join_lookup_miss_count,
1240 join_lookup_total_count,
1241 join_insert_cache_miss_count,
1242 join_actor_input_waiting_duration_ns,
1243 join_match_duration_ns,
1244 join_cached_entry_count,
1245 join_matched_join_keys,
1246 barrier_align_duration,
1247 agg_lookup_miss_count,
1248 agg_total_lookup_count,
1249 agg_cached_entry_count,
1250 agg_chunk_lookup_miss_count,
1251 agg_chunk_total_lookup_count,
1252 agg_dirty_groups_count,
1253 agg_dirty_groups_heap_size,
1254 agg_distinct_cache_miss_count,
1255 agg_distinct_total_cache_count,
1256 agg_distinct_cached_entry_count,
1257 agg_state_cache_lookup_count,
1258 agg_state_cache_miss_count,
1259 group_top_n_cache_miss_count,
1260 group_top_n_total_query_cache_count,
1261 group_top_n_cached_entry_count,
1262 group_top_n_appendonly_cache_miss_count,
1263 group_top_n_appendonly_total_query_cache_count,
1264 group_top_n_appendonly_cached_entry_count,
1265 lookup_cache_miss_count,
1266 lookup_total_query_cache_count,
1267 lookup_cached_entry_count,
1268 temporal_join_cache_miss_count,
1269 temporal_join_total_query_cache_count,
1270 temporal_join_cached_entry_count,
1271 backfill_snapshot_read_row_count,
1272 backfill_upstream_output_row_count,
1273 cdc_backfill_snapshot_read_row_count,
1274 cdc_backfill_upstream_output_row_count,
1275 snapshot_backfill_consume_row_count,
1276 over_window_cached_entry_count,
1277 over_window_cache_lookup_count,
1278 over_window_cache_miss_count,
1279 over_window_range_cache_entry_count,
1280 over_window_range_cache_lookup_count,
1281 over_window_range_cache_left_miss_count,
1282 over_window_range_cache_right_miss_count,
1283 over_window_accessed_entry_count,
1284 over_window_compute_count,
1285 over_window_same_output_count,
1286 barrier_inflight_latency,
1287 barrier_sync_latency,
1288 barrier_batch_size,
1289 barrier_manager_progress,
1290 kv_log_store_storage_write_count,
1291 kv_log_store_storage_write_size,
1292 kv_log_store_rewind_count,
1293 kv_log_store_rewind_delay,
1294 kv_log_store_storage_read_count,
1295 kv_log_store_storage_read_size,
1296 kv_log_store_buffer_unconsumed_item_count,
1297 kv_log_store_buffer_unconsumed_row_count,
1298 kv_log_store_buffer_unconsumed_epoch_count,
1299 kv_log_store_buffer_unconsumed_min_epoch,
1300 sync_kv_log_store_read_count,
1301 sync_kv_log_store_read_size,
1302 sync_kv_log_store_write_pause_duration_ns,
1303 sync_kv_log_store_state,
1304 sync_kv_log_store_wait_next_poll_ns,
1305 sync_kv_log_store_storage_write_count,
1306 sync_kv_log_store_storage_write_size,
1307 sync_kv_log_store_buffer_unconsumed_item_count,
1308 sync_kv_log_store_buffer_unconsumed_row_count,
1309 sync_kv_log_store_buffer_unconsumed_epoch_count,
1310 sync_kv_log_store_buffer_unconsumed_min_epoch,
1311 lru_runtime_loop_count,
1312 lru_latest_sequence,
1313 lru_watermark_sequence,
1314 lru_eviction_policy,
1315 jemalloc_allocated_bytes,
1316 jemalloc_active_bytes,
1317 jemalloc_resident_bytes,
1318 jemalloc_metadata_bytes,
1319 jvm_allocated_bytes,
1320 jvm_active_bytes,
1321 stream_memory_usage,
1322 materialize_cache_hit_count,
1323 materialize_data_exist_count,
1324 materialize_cache_total_count,
1325 materialize_input_row_count,
1326 materialize_current_epoch,
1327 pg_cdc_state_table_lsn,
1328 pg_cdc_jni_commit_offset_lsn,
1329 }
1330 }
1331
1332 pub fn unused() -> Self {
1334 global_streaming_metrics(MetricLevel::Disabled)
1335 }
1336
1337 pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1338 let label_list: &[&str; 1] = &[&actor_id.to_string()];
1339 let actor_execution_time = self
1340 .actor_execution_time
1341 .with_guarded_label_values(label_list);
1342 let actor_scheduled_duration = self
1343 .actor_scheduled_duration
1344 .with_guarded_label_values(label_list);
1345 let actor_scheduled_cnt = self
1346 .actor_scheduled_cnt
1347 .with_guarded_label_values(label_list);
1348 let actor_fast_poll_duration = self
1349 .actor_fast_poll_duration
1350 .with_guarded_label_values(label_list);
1351 let actor_fast_poll_cnt = self
1352 .actor_fast_poll_cnt
1353 .with_guarded_label_values(label_list);
1354 let actor_slow_poll_duration = self
1355 .actor_slow_poll_duration
1356 .with_guarded_label_values(label_list);
1357 let actor_slow_poll_cnt = self
1358 .actor_slow_poll_cnt
1359 .with_guarded_label_values(label_list);
1360 let actor_poll_duration = self
1361 .actor_poll_duration
1362 .with_guarded_label_values(label_list);
1363 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1364 let actor_idle_duration = self
1365 .actor_idle_duration
1366 .with_guarded_label_values(label_list);
1367 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1368 ActorMetrics {
1369 actor_execution_time,
1370 actor_scheduled_duration,
1371 actor_scheduled_cnt,
1372 actor_fast_poll_duration,
1373 actor_fast_poll_cnt,
1374 actor_slow_poll_duration,
1375 actor_slow_poll_cnt,
1376 actor_poll_duration,
1377 actor_poll_cnt,
1378 actor_idle_duration,
1379 actor_idle_cnt,
1380 }
1381 }
1382
1383 pub(crate) fn new_actor_input_metrics(
1384 &self,
1385 actor_id: ActorId,
1386 fragment_id: FragmentId,
1387 upstream_fragment_id: FragmentId,
1388 ) -> ActorInputMetrics {
1389 let actor_id_str = actor_id.to_string();
1390 let fragment_id_str = fragment_id.to_string();
1391 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1392 ActorInputMetrics {
1393 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1394 &actor_id_str,
1395 &fragment_id_str,
1396 &upstream_fragment_id_str,
1397 ]),
1398 actor_input_buffer_blocking_duration_ns: self
1399 .actor_input_buffer_blocking_duration_ns
1400 .with_guarded_label_values(&[
1401 &actor_id_str,
1402 &fragment_id_str,
1403 &upstream_fragment_id_str,
1404 ]),
1405 }
1406 }
1407
1408 pub fn new_sink_exec_metrics(
1409 &self,
1410 id: SinkId,
1411 actor_id: ActorId,
1412 fragment_id: FragmentId,
1413 ) -> SinkExecutorMetrics {
1414 let label_list: &[&str; 3] = &[
1415 &id.to_string(),
1416 &actor_id.to_string(),
1417 &fragment_id.to_string(),
1418 ];
1419 SinkExecutorMetrics {
1420 sink_input_row_count: self
1421 .sink_input_row_count
1422 .with_guarded_label_values(label_list),
1423 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1424 sink_chunk_buffer_size: self
1425 .sink_chunk_buffer_size
1426 .with_guarded_label_values(label_list),
1427 }
1428 }
1429
1430 pub fn new_group_top_n_metrics(
1431 &self,
1432 table_id: u32,
1433 actor_id: ActorId,
1434 fragment_id: FragmentId,
1435 ) -> GroupTopNMetrics {
1436 let label_list: &[&str; 3] = &[
1437 &table_id.to_string(),
1438 &actor_id.to_string(),
1439 &fragment_id.to_string(),
1440 ];
1441
1442 GroupTopNMetrics {
1443 group_top_n_cache_miss_count: self
1444 .group_top_n_cache_miss_count
1445 .with_guarded_label_values(label_list),
1446 group_top_n_total_query_cache_count: self
1447 .group_top_n_total_query_cache_count
1448 .with_guarded_label_values(label_list),
1449 group_top_n_cached_entry_count: self
1450 .group_top_n_cached_entry_count
1451 .with_guarded_label_values(label_list),
1452 }
1453 }
1454
1455 pub fn new_append_only_group_top_n_metrics(
1456 &self,
1457 table_id: u32,
1458 actor_id: ActorId,
1459 fragment_id: FragmentId,
1460 ) -> GroupTopNMetrics {
1461 let label_list: &[&str; 3] = &[
1462 &table_id.to_string(),
1463 &actor_id.to_string(),
1464 &fragment_id.to_string(),
1465 ];
1466
1467 GroupTopNMetrics {
1468 group_top_n_cache_miss_count: self
1469 .group_top_n_appendonly_cache_miss_count
1470 .with_guarded_label_values(label_list),
1471 group_top_n_total_query_cache_count: self
1472 .group_top_n_appendonly_total_query_cache_count
1473 .with_guarded_label_values(label_list),
1474 group_top_n_cached_entry_count: self
1475 .group_top_n_appendonly_cached_entry_count
1476 .with_guarded_label_values(label_list),
1477 }
1478 }
1479
1480 pub fn new_lookup_executor_metrics(
1481 &self,
1482 table_id: TableId,
1483 actor_id: ActorId,
1484 fragment_id: FragmentId,
1485 ) -> LookupExecutorMetrics {
1486 let label_list: &[&str; 3] = &[
1487 &table_id.to_string(),
1488 &actor_id.to_string(),
1489 &fragment_id.to_string(),
1490 ];
1491
1492 LookupExecutorMetrics {
1493 lookup_cache_miss_count: self
1494 .lookup_cache_miss_count
1495 .with_guarded_label_values(label_list),
1496 lookup_total_query_cache_count: self
1497 .lookup_total_query_cache_count
1498 .with_guarded_label_values(label_list),
1499 lookup_cached_entry_count: self
1500 .lookup_cached_entry_count
1501 .with_guarded_label_values(label_list),
1502 }
1503 }
1504
1505 pub fn new_hash_agg_metrics(
1506 &self,
1507 table_id: u32,
1508 actor_id: ActorId,
1509 fragment_id: FragmentId,
1510 ) -> HashAggMetrics {
1511 let label_list: &[&str; 3] = &[
1512 &table_id.to_string(),
1513 &actor_id.to_string(),
1514 &fragment_id.to_string(),
1515 ];
1516 HashAggMetrics {
1517 agg_lookup_miss_count: self
1518 .agg_lookup_miss_count
1519 .with_guarded_label_values(label_list),
1520 agg_total_lookup_count: self
1521 .agg_total_lookup_count
1522 .with_guarded_label_values(label_list),
1523 agg_cached_entry_count: self
1524 .agg_cached_entry_count
1525 .with_guarded_label_values(label_list),
1526 agg_chunk_lookup_miss_count: self
1527 .agg_chunk_lookup_miss_count
1528 .with_guarded_label_values(label_list),
1529 agg_chunk_total_lookup_count: self
1530 .agg_chunk_total_lookup_count
1531 .with_guarded_label_values(label_list),
1532 agg_dirty_groups_count: self
1533 .agg_dirty_groups_count
1534 .with_guarded_label_values(label_list),
1535 agg_dirty_groups_heap_size: self
1536 .agg_dirty_groups_heap_size
1537 .with_guarded_label_values(label_list),
1538 agg_state_cache_lookup_count: self
1539 .agg_state_cache_lookup_count
1540 .with_guarded_label_values(label_list),
1541 agg_state_cache_miss_count: self
1542 .agg_state_cache_miss_count
1543 .with_guarded_label_values(label_list),
1544 }
1545 }
1546
1547 pub fn new_agg_distinct_dedup_metrics(
1548 &self,
1549 table_id: u32,
1550 actor_id: ActorId,
1551 fragment_id: FragmentId,
1552 ) -> AggDistinctDedupMetrics {
1553 let label_list: &[&str; 3] = &[
1554 &table_id.to_string(),
1555 &actor_id.to_string(),
1556 &fragment_id.to_string(),
1557 ];
1558 AggDistinctDedupMetrics {
1559 agg_distinct_cache_miss_count: self
1560 .agg_distinct_cache_miss_count
1561 .with_guarded_label_values(label_list),
1562 agg_distinct_total_cache_count: self
1563 .agg_distinct_total_cache_count
1564 .with_guarded_label_values(label_list),
1565 agg_distinct_cached_entry_count: self
1566 .agg_distinct_cached_entry_count
1567 .with_guarded_label_values(label_list),
1568 }
1569 }
1570
1571 pub fn new_temporal_join_metrics(
1572 &self,
1573 table_id: TableId,
1574 actor_id: ActorId,
1575 fragment_id: FragmentId,
1576 ) -> TemporalJoinMetrics {
1577 let label_list: &[&str; 3] = &[
1578 &table_id.to_string(),
1579 &actor_id.to_string(),
1580 &fragment_id.to_string(),
1581 ];
1582 TemporalJoinMetrics {
1583 temporal_join_cache_miss_count: self
1584 .temporal_join_cache_miss_count
1585 .with_guarded_label_values(label_list),
1586 temporal_join_total_query_cache_count: self
1587 .temporal_join_total_query_cache_count
1588 .with_guarded_label_values(label_list),
1589 temporal_join_cached_entry_count: self
1590 .temporal_join_cached_entry_count
1591 .with_guarded_label_values(label_list),
1592 }
1593 }
1594
1595 pub fn new_backfill_metrics(&self, table_id: u32, actor_id: ActorId) -> BackfillMetrics {
1596 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1597 BackfillMetrics {
1598 backfill_snapshot_read_row_count: self
1599 .backfill_snapshot_read_row_count
1600 .with_guarded_label_values(label_list),
1601 backfill_upstream_output_row_count: self
1602 .backfill_upstream_output_row_count
1603 .with_guarded_label_values(label_list),
1604 }
1605 }
1606
1607 pub fn new_cdc_backfill_metrics(
1608 &self,
1609 table_id: TableId,
1610 actor_id: ActorId,
1611 ) -> CdcBackfillMetrics {
1612 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1613 CdcBackfillMetrics {
1614 cdc_backfill_snapshot_read_row_count: self
1615 .cdc_backfill_snapshot_read_row_count
1616 .with_guarded_label_values(label_list),
1617 cdc_backfill_upstream_output_row_count: self
1618 .cdc_backfill_upstream_output_row_count
1619 .with_guarded_label_values(label_list),
1620 }
1621 }
1622
1623 pub fn new_over_window_metrics(
1624 &self,
1625 table_id: u32,
1626 actor_id: ActorId,
1627 fragment_id: FragmentId,
1628 ) -> OverWindowMetrics {
1629 let label_list: &[&str; 3] = &[
1630 &table_id.to_string(),
1631 &actor_id.to_string(),
1632 &fragment_id.to_string(),
1633 ];
1634 OverWindowMetrics {
1635 over_window_cached_entry_count: self
1636 .over_window_cached_entry_count
1637 .with_guarded_label_values(label_list),
1638 over_window_cache_lookup_count: self
1639 .over_window_cache_lookup_count
1640 .with_guarded_label_values(label_list),
1641 over_window_cache_miss_count: self
1642 .over_window_cache_miss_count
1643 .with_guarded_label_values(label_list),
1644 over_window_range_cache_entry_count: self
1645 .over_window_range_cache_entry_count
1646 .with_guarded_label_values(label_list),
1647 over_window_range_cache_lookup_count: self
1648 .over_window_range_cache_lookup_count
1649 .with_guarded_label_values(label_list),
1650 over_window_range_cache_left_miss_count: self
1651 .over_window_range_cache_left_miss_count
1652 .with_guarded_label_values(label_list),
1653 over_window_range_cache_right_miss_count: self
1654 .over_window_range_cache_right_miss_count
1655 .with_guarded_label_values(label_list),
1656 over_window_accessed_entry_count: self
1657 .over_window_accessed_entry_count
1658 .with_guarded_label_values(label_list),
1659 over_window_compute_count: self
1660 .over_window_compute_count
1661 .with_guarded_label_values(label_list),
1662 over_window_same_output_count: self
1663 .over_window_same_output_count
1664 .with_guarded_label_values(label_list),
1665 }
1666 }
1667
1668 pub fn new_materialize_metrics(
1669 &self,
1670 table_id: TableId,
1671 actor_id: ActorId,
1672 fragment_id: FragmentId,
1673 ) -> MaterializeMetrics {
1674 let label_list: &[&str; 3] = &[
1675 &actor_id.to_string(),
1676 &table_id.to_string(),
1677 &fragment_id.to_string(),
1678 ];
1679 MaterializeMetrics {
1680 materialize_cache_hit_count: self
1681 .materialize_cache_hit_count
1682 .with_guarded_label_values(label_list),
1683 materialize_data_exist_count: self
1684 .materialize_data_exist_count
1685 .with_guarded_label_values(label_list),
1686 materialize_cache_total_count: self
1687 .materialize_cache_total_count
1688 .with_guarded_label_values(label_list),
1689 materialize_input_row_count: self
1690 .materialize_input_row_count
1691 .with_guarded_label_values(label_list),
1692 materialize_current_epoch: self
1693 .materialize_current_epoch
1694 .with_guarded_label_values(label_list),
1695 }
1696 }
1697}
1698
1699pub(crate) struct ActorInputMetrics {
1700 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1701 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1702}
1703
1704pub struct ActorMetrics {
1706 pub actor_execution_time: LabelGuardedGauge,
1707 pub actor_scheduled_duration: LabelGuardedGauge,
1708 pub actor_scheduled_cnt: LabelGuardedIntGauge,
1709 pub actor_fast_poll_duration: LabelGuardedGauge,
1710 pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1711 pub actor_slow_poll_duration: LabelGuardedGauge,
1712 pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1713 pub actor_poll_duration: LabelGuardedGauge,
1714 pub actor_poll_cnt: LabelGuardedIntGauge,
1715 pub actor_idle_duration: LabelGuardedGauge,
1716 pub actor_idle_cnt: LabelGuardedIntGauge,
1717}
1718
1719pub struct SinkExecutorMetrics {
1720 pub sink_input_row_count: LabelGuardedIntCounter,
1721 pub sink_input_bytes: LabelGuardedIntCounter,
1722 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1723}
1724
1725pub struct MaterializeMetrics {
1726 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1727 pub materialize_data_exist_count: LabelGuardedIntCounter,
1728 pub materialize_cache_total_count: LabelGuardedIntCounter,
1729 pub materialize_input_row_count: LabelGuardedIntCounter,
1730 pub materialize_current_epoch: LabelGuardedIntGauge,
1731}
1732
1733pub struct GroupTopNMetrics {
1734 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1735 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1736 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1737}
1738
1739pub struct LookupExecutorMetrics {
1740 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1741 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1742 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1743}
1744
1745pub struct HashAggMetrics {
1746 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1747 pub agg_total_lookup_count: LabelGuardedIntCounter,
1748 pub agg_cached_entry_count: LabelGuardedIntGauge,
1749 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1750 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1751 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1752 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1753 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1754 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1755}
1756
1757pub struct AggDistinctDedupMetrics {
1758 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1759 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1760 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1761}
1762
1763pub struct TemporalJoinMetrics {
1764 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1765 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1766 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1767}
1768
1769pub struct BackfillMetrics {
1770 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1771 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1772}
1773
1774#[derive(Clone)]
1775pub struct CdcBackfillMetrics {
1776 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1777 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1778}
1779
1780pub struct OverWindowMetrics {
1781 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1782 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1783 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1784 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1785 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1786 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1787 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1788 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1789 pub over_window_compute_count: LabelGuardedIntCounter,
1790 pub over_window_same_output_count: LabelGuardedIntCounter,
1791}