1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33 register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45 pub level: MetricLevel,
46
47 pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50 pub mem_stream_node_output_row_count: CountMap,
54 pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56 actor_execution_time: LabelGuardedGaugeVec,
58 actor_scheduled_duration: LabelGuardedGaugeVec,
59 actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60 actor_fast_poll_duration: LabelGuardedGaugeVec,
61 actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62 actor_slow_poll_duration: LabelGuardedGaugeVec,
63 actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64 actor_poll_duration: LabelGuardedGaugeVec,
65 actor_poll_cnt: LabelGuardedIntGaugeVec,
66 actor_idle_duration: LabelGuardedGaugeVec,
67 actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69 pub actor_count: LabelGuardedIntGaugeVec,
71 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75 pub source_output_row_count: LabelGuardedIntCounterVec,
77 pub source_split_change_count: LabelGuardedIntCounterVec,
78 pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80 sink_input_row_count: LabelGuardedIntCounterVec,
82 sink_input_bytes: LabelGuardedIntCounterVec,
83 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88 pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94 actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec,
95
96 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98 pub join_lookup_total_count: LabelGuardedIntCounterVec,
99 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101 pub join_match_duration_ns: LabelGuardedIntCounterVec,
102 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108 agg_lookup_miss_count: LabelGuardedIntCounterVec,
110 agg_total_lookup_count: LabelGuardedIntCounterVec,
111 agg_cached_entry_count: LabelGuardedIntGaugeVec,
112 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131 lookup_cache_miss_count: LabelGuardedIntCounterVec,
133 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155 over_window_cache_miss_count: LabelGuardedIntCounterVec,
156 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161 over_window_compute_count: LabelGuardedIntCounterVec,
162 over_window_same_output_count: LabelGuardedIntCounterVec,
163
164 pub barrier_inflight_latency: Histogram,
168 pub barrier_sync_latency: Histogram,
170 pub barrier_batch_size: Histogram,
171 pub barrier_manager_progress: IntCounter,
173
174 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197 pub lru_runtime_loop_count: IntCounter,
199 pub lru_latest_sequence: IntGauge,
200 pub lru_watermark_sequence: IntGauge,
201 pub lru_eviction_policy: IntGauge,
202 pub jemalloc_allocated_bytes: IntGauge,
203 pub jemalloc_active_bytes: IntGauge,
204 pub jemalloc_resident_bytes: IntGauge,
205 pub jemalloc_metadata_bytes: IntGauge,
206 pub jvm_allocated_bytes: IntGauge,
207 pub jvm_active_bytes: IntGauge,
208 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214 materialize_input_row_count: RelabeledGuardedIntCounterVec,
215 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223}
224
225pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
226
227pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
228 GLOBAL_STREAMING_METRICS
229 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
230 .clone()
231}
232
233impl StreamingMetrics {
234 fn new(registry: &Registry, level: MetricLevel) -> Self {
235 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
236 "stream_executor_row_count",
237 "Total number of rows that have been output from each executor",
238 &["actor_id", "fragment_id", "executor_identity"],
239 registry
240 )
241 .unwrap()
242 .relabel_debug_1(level);
243
244 let stream_node_output_row_count = CountMap::new();
245 let stream_node_output_blocking_duration_ns = CountMap::new();
246
247 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
248 "stream_source_output_rows_counts",
249 "Total number of rows that have been output from source",
250 &["source_id", "source_name", "actor_id", "fragment_id"],
251 registry
252 )
253 .unwrap();
254
255 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
256 "stream_source_split_change_event_count",
257 "Total number of split change events that have been operated by source",
258 &["source_id", "source_name", "actor_id", "fragment_id"],
259 registry
260 )
261 .unwrap();
262
263 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
264 "stream_source_backfill_rows_counts",
265 "Total number of rows that have been backfilled for source",
266 &["source_id", "source_name", "actor_id", "fragment_id"],
267 registry
268 )
269 .unwrap();
270
271 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
272 "stream_sink_input_row_count",
273 "Total number of rows streamed into sink executors",
274 &["sink_id", "actor_id", "fragment_id"],
275 registry
276 )
277 .unwrap();
278
279 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
280 "stream_sink_input_bytes",
281 "Total size of chunks streamed into sink executors",
282 &["sink_id", "actor_id", "fragment_id"],
283 registry
284 )
285 .unwrap();
286
287 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
288 "stream_mview_input_row_count",
289 "Total number of rows streamed into materialize executors",
290 &["actor_id", "table_id", "fragment_id"],
291 registry
292 )
293 .unwrap()
294 .relabel_debug_1(level);
295
296 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
297 "stream_mview_current_epoch",
298 "The current epoch of the materialized executor",
299 &["actor_id", "table_id", "fragment_id"],
300 registry
301 )
302 .unwrap()
303 .relabel_debug_1(level);
304
305 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
306 "stream_pg_cdc_state_table_lsn",
307 "Current LSN value stored in PostgreSQL CDC state table",
308 &["source_id"],
309 registry,
310 )
311 .unwrap();
312
313 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
314 "stream_pg_cdc_jni_commit_offset_lsn",
315 "LSN value when JNI commit offset is called for PostgreSQL CDC",
316 &["source_id"],
317 registry,
318 )
319 .unwrap();
320
321 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
322 "stream_sink_chunk_buffer_size",
323 "Total size of chunks buffered in a barrier",
324 &["sink_id", "actor_id", "fragment_id"],
325 registry
326 )
327 .unwrap();
328
329 let actor_execution_time = register_guarded_gauge_vec_with_registry!(
330 "stream_actor_actor_execution_time",
331 "Total execution time (s) of an actor",
332 &["actor_id"],
333 registry
334 )
335 .unwrap();
336
337 let actor_output_buffer_blocking_duration_ns =
338 register_guarded_int_counter_vec_with_registry!(
339 "stream_actor_output_buffer_blocking_duration_ns",
340 "Total blocking duration (ns) of output buffer",
341 &["actor_id", "fragment_id", "downstream_fragment_id"],
342 registry
343 )
344 .unwrap()
345 .relabel_debug_1(level);
347
348 let actor_input_buffer_blocking_duration_ns =
349 register_guarded_int_counter_vec_with_registry!(
350 "stream_actor_input_buffer_blocking_duration_ns",
351 "Total blocking duration (ns) of input buffer",
352 &["actor_id", "fragment_id", "upstream_fragment_id"],
353 registry
354 )
355 .unwrap();
356
357 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
358 "stream_exchange_frag_recv_size",
359 "Total size of messages that have been received from upstream Fragment",
360 &["up_fragment_id", "down_fragment_id"],
361 registry
362 )
363 .unwrap();
364
365 let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
366 "stream_actor_fast_poll_duration",
367 "tokio's metrics",
368 &["actor_id"],
369 registry
370 )
371 .unwrap();
372
373 let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
374 "stream_actor_fast_poll_cnt",
375 "tokio's metrics",
376 &["actor_id"],
377 registry
378 )
379 .unwrap();
380
381 let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
382 "stream_actor_slow_poll_duration",
383 "tokio's metrics",
384 &["actor_id"],
385 registry
386 )
387 .unwrap();
388
389 let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
390 "stream_actor_slow_poll_cnt",
391 "tokio's metrics",
392 &["actor_id"],
393 registry
394 )
395 .unwrap();
396
397 let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
398 "stream_actor_poll_duration",
399 "tokio's metrics",
400 &["actor_id"],
401 registry
402 )
403 .unwrap();
404
405 let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
406 "stream_actor_poll_cnt",
407 "tokio's metrics",
408 &["actor_id"],
409 registry
410 )
411 .unwrap();
412
413 let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
414 "stream_actor_scheduled_duration",
415 "tokio's metrics",
416 &["actor_id"],
417 registry
418 )
419 .unwrap();
420
421 let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
422 "stream_actor_scheduled_cnt",
423 "tokio's metrics",
424 &["actor_id"],
425 registry
426 )
427 .unwrap();
428
429 let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
430 "stream_actor_idle_duration",
431 "tokio's metrics",
432 &["actor_id"],
433 registry
434 )
435 .unwrap();
436
437 let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
438 "stream_actor_idle_cnt",
439 "tokio's metrics",
440 &["actor_id"],
441 registry
442 )
443 .unwrap();
444
445 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
446 "stream_actor_in_record_cnt",
447 "Total number of rows actor received",
448 &["actor_id", "fragment_id", "upstream_fragment_id"],
449 registry
450 )
451 .unwrap()
452 .relabel_debug_1(level);
453
454 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
455 "stream_actor_out_record_cnt",
456 "Total number of rows actor sent",
457 &["actor_id", "fragment_id"],
458 registry
459 )
460 .unwrap()
461 .relabel_debug_1(level);
462
463 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
464 "stream_actor_current_epoch",
465 "Current epoch of actor",
466 &["actor_id", "fragment_id"],
467 registry
468 )
469 .unwrap()
470 .relabel_debug_1(level);
471
472 let actor_count = register_guarded_int_gauge_vec_with_registry!(
473 "stream_actor_count",
474 "Total number of actors (parallelism)",
475 &["fragment_id"],
476 registry
477 )
478 .unwrap();
479
480 let opts = histogram_opts!(
481 "stream_merge_barrier_align_duration",
482 "Duration of merge align barrier",
483 exponential_buckets(0.0001, 2.0, 21).unwrap() );
485 let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
486 opts,
487 &["actor_id", "fragment_id"],
488 registry
489 )
490 .unwrap()
491 .relabel_debug_1(level);
492
493 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
494 "stream_join_lookup_miss_count",
495 "Join executor lookup miss duration",
496 &["side", "join_table_id", "actor_id", "fragment_id"],
497 registry
498 )
499 .unwrap();
500
501 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
502 "stream_join_lookup_total_count",
503 "Join executor lookup total operation",
504 &["side", "join_table_id", "actor_id", "fragment_id"],
505 registry
506 )
507 .unwrap();
508
509 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
510 "stream_join_insert_cache_miss_count",
511 "Join executor cache miss when insert operation",
512 &["side", "join_table_id", "actor_id", "fragment_id"],
513 registry
514 )
515 .unwrap();
516
517 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
518 "stream_join_actor_input_waiting_duration_ns",
519 "Total waiting duration (ns) of input buffer of join actor",
520 &["actor_id", "fragment_id"],
521 registry
522 )
523 .unwrap();
524
525 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
526 "stream_join_match_duration_ns",
527 "Matching duration for each side",
528 &["actor_id", "fragment_id", "side"],
529 registry
530 )
531 .unwrap();
532
533 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
534 "stream_barrier_align_duration_ns",
535 "Duration of join align barrier",
536 &["actor_id", "fragment_id", "wait_side", "executor"],
537 registry
538 )
539 .unwrap()
540 .relabel_debug_1(level);
541
542 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
543 "stream_join_cached_entry_count",
544 "Number of cached entries in streaming join operators",
545 &["actor_id", "fragment_id", "side"],
546 registry
547 )
548 .unwrap();
549
550 let join_matched_join_keys_opts = histogram_opts!(
551 "stream_join_matched_join_keys",
552 "The number of keys matched in the opposite side",
553 exponential_buckets(16.0, 2.0, 28).unwrap() );
555
556 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
557 join_matched_join_keys_opts,
558 &["actor_id", "fragment_id", "table_id"],
559 registry
560 )
561 .unwrap()
562 .relabel_debug_1(level);
563
564 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
565 "stream_agg_lookup_miss_count",
566 "Aggregation executor lookup miss duration",
567 &["table_id", "actor_id", "fragment_id"],
568 registry
569 )
570 .unwrap();
571
572 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
573 "stream_agg_lookup_total_count",
574 "Aggregation executor lookup total operation",
575 &["table_id", "actor_id", "fragment_id"],
576 registry
577 )
578 .unwrap();
579
580 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
581 "stream_agg_distinct_cache_miss_count",
582 "Aggregation executor dinsinct miss duration",
583 &["table_id", "actor_id", "fragment_id"],
584 registry
585 )
586 .unwrap();
587
588 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
589 "stream_agg_distinct_total_cache_count",
590 "Aggregation executor distinct total operation",
591 &["table_id", "actor_id", "fragment_id"],
592 registry
593 )
594 .unwrap();
595
596 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
597 "stream_agg_distinct_cached_entry_count",
598 "Total entry counts in distinct aggregation executor cache",
599 &["table_id", "actor_id", "fragment_id"],
600 registry
601 )
602 .unwrap();
603
604 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
605 "stream_agg_dirty_groups_count",
606 "Total dirty group counts in aggregation executor",
607 &["table_id", "actor_id", "fragment_id"],
608 registry
609 )
610 .unwrap();
611
612 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
613 "stream_agg_dirty_groups_heap_size",
614 "Total dirty group heap size in aggregation executor",
615 &["table_id", "actor_id", "fragment_id"],
616 registry
617 )
618 .unwrap();
619
620 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
621 "stream_agg_state_cache_lookup_count",
622 "Aggregation executor state cache lookup count",
623 &["table_id", "actor_id", "fragment_id"],
624 registry
625 )
626 .unwrap();
627
628 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
629 "stream_agg_state_cache_miss_count",
630 "Aggregation executor state cache miss count",
631 &["table_id", "actor_id", "fragment_id"],
632 registry
633 )
634 .unwrap();
635
636 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
637 "stream_group_top_n_cache_miss_count",
638 "Group top n executor cache miss count",
639 &["table_id", "actor_id", "fragment_id"],
640 registry
641 )
642 .unwrap();
643
644 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
645 "stream_group_top_n_total_query_cache_count",
646 "Group top n executor query cache total count",
647 &["table_id", "actor_id", "fragment_id"],
648 registry
649 )
650 .unwrap();
651
652 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
653 "stream_group_top_n_cached_entry_count",
654 "Total entry counts in group top n executor cache",
655 &["table_id", "actor_id", "fragment_id"],
656 registry
657 )
658 .unwrap();
659
660 let group_top_n_appendonly_cache_miss_count =
661 register_guarded_int_counter_vec_with_registry!(
662 "stream_group_top_n_appendonly_cache_miss_count",
663 "Group top n appendonly executor cache miss count",
664 &["table_id", "actor_id", "fragment_id"],
665 registry
666 )
667 .unwrap();
668
669 let group_top_n_appendonly_total_query_cache_count =
670 register_guarded_int_counter_vec_with_registry!(
671 "stream_group_top_n_appendonly_total_query_cache_count",
672 "Group top n appendonly executor total cache count",
673 &["table_id", "actor_id", "fragment_id"],
674 registry
675 )
676 .unwrap();
677
678 let group_top_n_appendonly_cached_entry_count =
679 register_guarded_int_gauge_vec_with_registry!(
680 "stream_group_top_n_appendonly_cached_entry_count",
681 "Total entry counts in group top n appendonly executor cache",
682 &["table_id", "actor_id", "fragment_id"],
683 registry
684 )
685 .unwrap();
686
687 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
688 "stream_lookup_cache_miss_count",
689 "Lookup executor cache miss count",
690 &["table_id", "actor_id", "fragment_id"],
691 registry
692 )
693 .unwrap();
694
695 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
696 "stream_lookup_total_query_cache_count",
697 "Lookup executor query cache total count",
698 &["table_id", "actor_id", "fragment_id"],
699 registry
700 )
701 .unwrap();
702
703 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
704 "stream_lookup_cached_entry_count",
705 "Total entry counts in lookup executor cache",
706 &["table_id", "actor_id", "fragment_id"],
707 registry
708 )
709 .unwrap();
710
711 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
712 "stream_temporal_join_cache_miss_count",
713 "Temporal join executor cache miss count",
714 &["table_id", "actor_id", "fragment_id"],
715 registry
716 )
717 .unwrap();
718
719 let temporal_join_total_query_cache_count =
720 register_guarded_int_counter_vec_with_registry!(
721 "stream_temporal_join_total_query_cache_count",
722 "Temporal join executor query cache total count",
723 &["table_id", "actor_id", "fragment_id"],
724 registry
725 )
726 .unwrap();
727
728 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
729 "stream_temporal_join_cached_entry_count",
730 "Total entry count in temporal join executor cache",
731 &["table_id", "actor_id", "fragment_id"],
732 registry
733 )
734 .unwrap();
735
736 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
737 "stream_agg_cached_entry_count",
738 "Number of cached keys in streaming aggregation operators",
739 &["table_id", "actor_id", "fragment_id"],
740 registry
741 )
742 .unwrap();
743
744 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
745 "stream_agg_chunk_lookup_miss_count",
746 "Aggregation executor chunk-level lookup miss duration",
747 &["table_id", "actor_id", "fragment_id"],
748 registry
749 )
750 .unwrap();
751
752 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
753 "stream_agg_chunk_lookup_total_count",
754 "Aggregation executor chunk-level lookup total operation",
755 &["table_id", "actor_id", "fragment_id"],
756 registry
757 )
758 .unwrap();
759
760 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
761 "stream_backfill_snapshot_read_row_count",
762 "Total number of rows that have been read from the backfill snapshot",
763 &["table_id", "actor_id"],
764 registry
765 )
766 .unwrap();
767
768 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
769 "stream_backfill_upstream_output_row_count",
770 "Total number of rows that have been output from the backfill upstream",
771 &["table_id", "actor_id"],
772 registry
773 )
774 .unwrap();
775
776 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
777 "stream_cdc_backfill_snapshot_read_row_count",
778 "Total number of rows that have been read from the cdc_backfill snapshot",
779 &["table_id", "actor_id"],
780 registry
781 )
782 .unwrap();
783
784 let cdc_backfill_upstream_output_row_count =
785 register_guarded_int_counter_vec_with_registry!(
786 "stream_cdc_backfill_upstream_output_row_count",
787 "Total number of rows that have been output from the cdc_backfill upstream",
788 &["table_id", "actor_id"],
789 registry
790 )
791 .unwrap();
792
793 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
794 "stream_snapshot_backfill_consume_snapshot_row_count",
795 "Total number of rows that have been output from snapshot backfill",
796 &["table_id", "actor_id", "stage"],
797 registry
798 )
799 .unwrap();
800
801 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
802 "stream_over_window_cached_entry_count",
803 "Total entry (partition) count in over window executor cache",
804 &["table_id", "actor_id", "fragment_id"],
805 registry
806 )
807 .unwrap();
808
809 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
810 "stream_over_window_cache_lookup_count",
811 "Over window executor cache lookup count",
812 &["table_id", "actor_id", "fragment_id"],
813 registry
814 )
815 .unwrap();
816
817 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
818 "stream_over_window_cache_miss_count",
819 "Over window executor cache miss count",
820 &["table_id", "actor_id", "fragment_id"],
821 registry
822 )
823 .unwrap();
824
825 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
826 "stream_over_window_range_cache_entry_count",
827 "Over window partition range cache entry count",
828 &["table_id", "actor_id", "fragment_id"],
829 registry,
830 )
831 .unwrap();
832
833 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
834 "stream_over_window_range_cache_lookup_count",
835 "Over window partition range cache lookup count",
836 &["table_id", "actor_id", "fragment_id"],
837 registry
838 )
839 .unwrap();
840
841 let over_window_range_cache_left_miss_count =
842 register_guarded_int_counter_vec_with_registry!(
843 "stream_over_window_range_cache_left_miss_count",
844 "Over window partition range cache left miss count",
845 &["table_id", "actor_id", "fragment_id"],
846 registry
847 )
848 .unwrap();
849
850 let over_window_range_cache_right_miss_count =
851 register_guarded_int_counter_vec_with_registry!(
852 "stream_over_window_range_cache_right_miss_count",
853 "Over window partition range cache right miss count",
854 &["table_id", "actor_id", "fragment_id"],
855 registry
856 )
857 .unwrap();
858
859 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
860 "stream_over_window_accessed_entry_count",
861 "Over window accessed entry count",
862 &["table_id", "actor_id", "fragment_id"],
863 registry
864 )
865 .unwrap();
866
867 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
868 "stream_over_window_compute_count",
869 "Over window compute count",
870 &["table_id", "actor_id", "fragment_id"],
871 registry
872 )
873 .unwrap();
874
875 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
876 "stream_over_window_same_output_count",
877 "Over window same output count",
878 &["table_id", "actor_id", "fragment_id"],
879 registry
880 )
881 .unwrap();
882
883 let opts = histogram_opts!(
884 "stream_barrier_inflight_duration_seconds",
885 "barrier_inflight_latency",
886 exponential_buckets(0.1, 1.5, 16).unwrap() );
888 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
889
890 let opts = histogram_opts!(
891 "stream_barrier_sync_storage_duration_seconds",
892 "barrier_sync_latency",
893 exponential_buckets(0.1, 1.5, 16).unwrap() );
895 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
896
897 let opts = histogram_opts!(
898 "stream_barrier_batch_size",
899 "barrier_batch_size",
900 exponential_buckets(1.0, 2.0, 8).unwrap()
901 );
902 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
903
904 let barrier_manager_progress = register_int_counter_with_registry!(
905 "stream_barrier_manager_progress",
906 "The number of actors that have processed the earliest in-flight barriers",
907 registry
908 )
909 .unwrap();
910
911 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
912 "sync_kv_log_store_wait_next_poll_ns",
913 "Total duration (ns) of waiting for next poll",
914 &["actor_id", "target", "fragment_id", "relation"],
915 registry
916 )
917 .unwrap();
918
919 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
920 "sync_kv_log_store_read_count",
921 "read row count throughput of sync_kv log store",
922 &["type", "actor_id", "target", "fragment_id", "relation"],
923 registry
924 )
925 .unwrap();
926
927 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
928 "sync_kv_log_store_read_size",
929 "read size throughput of sync_kv log store",
930 &["type", "actor_id", "target", "fragment_id", "relation"],
931 registry
932 )
933 .unwrap();
934
935 let sync_kv_log_store_write_pause_duration_ns =
936 register_guarded_int_counter_vec_with_registry!(
937 "sync_kv_log_store_write_pause_duration_ns",
938 "Duration (ns) of sync_kv log store write pause",
939 &["actor_id", "target", "fragment_id", "relation"],
940 registry
941 )
942 .unwrap();
943
944 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
945 "sync_kv_log_store_state",
946 "clean/unclean state transition for sync_kv log store",
947 &["state", "actor_id", "target", "fragment_id", "relation"],
948 registry
949 )
950 .unwrap();
951
952 let sync_kv_log_store_storage_write_count =
953 register_guarded_int_counter_vec_with_registry!(
954 "sync_kv_log_store_storage_write_count",
955 "Write row count throughput of sync_kv log store",
956 &["actor_id", "target", "fragment_id", "relation"],
957 registry
958 )
959 .unwrap();
960
961 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
962 "sync_kv_log_store_storage_write_size",
963 "Write size throughput of sync_kv log store",
964 &["actor_id", "target", "fragment_id", "relation"],
965 registry
966 )
967 .unwrap();
968
969 let sync_kv_log_store_buffer_unconsumed_item_count =
970 register_guarded_int_gauge_vec_with_registry!(
971 "sync_kv_log_store_buffer_unconsumed_item_count",
972 "Number of Unconsumed Item in buffer",
973 &["actor_id", "target", "fragment_id", "relation"],
974 registry
975 )
976 .unwrap();
977
978 let sync_kv_log_store_buffer_unconsumed_row_count =
979 register_guarded_int_gauge_vec_with_registry!(
980 "sync_kv_log_store_buffer_unconsumed_row_count",
981 "Number of Unconsumed Row in buffer",
982 &["actor_id", "target", "fragment_id", "relation"],
983 registry
984 )
985 .unwrap();
986
987 let sync_kv_log_store_buffer_unconsumed_epoch_count =
988 register_guarded_int_gauge_vec_with_registry!(
989 "sync_kv_log_store_buffer_unconsumed_epoch_count",
990 "Number of Unconsumed Epoch in buffer",
991 &["actor_id", "target", "fragment_id", "relation"],
992 registry
993 )
994 .unwrap();
995
996 let sync_kv_log_store_buffer_unconsumed_min_epoch =
997 register_guarded_int_gauge_vec_with_registry!(
998 "sync_kv_log_store_buffer_unconsumed_min_epoch",
999 "Number of Unconsumed Epoch in buffer",
1000 &["actor_id", "target", "fragment_id", "relation"],
1001 registry
1002 )
1003 .unwrap();
1004
1005 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1006 "kv_log_store_storage_write_count",
1007 "Write row count throughput of kv log store",
1008 &["actor_id", "connector", "sink_id", "sink_name"],
1009 registry
1010 )
1011 .unwrap();
1012
1013 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1014 "kv_log_store_storage_write_size",
1015 "Write size throughput of kv log store",
1016 &["actor_id", "connector", "sink_id", "sink_name"],
1017 registry
1018 )
1019 .unwrap();
1020
1021 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1022 "kv_log_store_storage_read_count",
1023 "Write row count throughput of kv log store",
1024 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1025 registry
1026 )
1027 .unwrap();
1028
1029 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1030 "kv_log_store_storage_read_size",
1031 "Write size throughput of kv log store",
1032 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1033 registry
1034 )
1035 .unwrap();
1036
1037 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1038 "kv_log_store_rewind_count",
1039 "Kv log store rewind rate",
1040 &["actor_id", "connector", "sink_id", "sink_name"],
1041 registry
1042 )
1043 .unwrap();
1044
1045 let kv_log_store_rewind_delay_opts = {
1046 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1047 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1048 - REWIND_BASE_DELAY.as_secs_f64().log2())
1049 .ceil() as usize;
1050 let buckets = exponential_buckets(
1051 REWIND_BASE_DELAY.as_secs_f64(),
1052 REWIND_BACKOFF_FACTOR as _,
1053 bucket_count,
1054 )
1055 .unwrap();
1056 histogram_opts!(
1057 "kv_log_store_rewind_delay",
1058 "Kv log store rewind delay",
1059 buckets,
1060 )
1061 };
1062
1063 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1064 kv_log_store_rewind_delay_opts,
1065 &["actor_id", "connector", "sink_id", "sink_name"],
1066 registry
1067 )
1068 .unwrap();
1069
1070 let kv_log_store_buffer_unconsumed_item_count =
1071 register_guarded_int_gauge_vec_with_registry!(
1072 "kv_log_store_buffer_unconsumed_item_count",
1073 "Number of Unconsumed Item in buffer",
1074 &["actor_id", "connector", "sink_id", "sink_name"],
1075 registry
1076 )
1077 .unwrap();
1078
1079 let kv_log_store_buffer_unconsumed_row_count =
1080 register_guarded_int_gauge_vec_with_registry!(
1081 "kv_log_store_buffer_unconsumed_row_count",
1082 "Number of Unconsumed Row in buffer",
1083 &["actor_id", "connector", "sink_id", "sink_name"],
1084 registry
1085 )
1086 .unwrap();
1087
1088 let kv_log_store_buffer_unconsumed_epoch_count =
1089 register_guarded_int_gauge_vec_with_registry!(
1090 "kv_log_store_buffer_unconsumed_epoch_count",
1091 "Number of Unconsumed Epoch in buffer",
1092 &["actor_id", "connector", "sink_id", "sink_name"],
1093 registry
1094 )
1095 .unwrap();
1096
1097 let kv_log_store_buffer_unconsumed_min_epoch =
1098 register_guarded_int_gauge_vec_with_registry!(
1099 "kv_log_store_buffer_unconsumed_min_epoch",
1100 "Number of Unconsumed Epoch in buffer",
1101 &["actor_id", "connector", "sink_id", "sink_name"],
1102 registry
1103 )
1104 .unwrap();
1105
1106 let lru_runtime_loop_count = register_int_counter_with_registry!(
1107 "lru_runtime_loop_count",
1108 "The counts of the eviction loop in LRU manager per second",
1109 registry
1110 )
1111 .unwrap();
1112
1113 let lru_latest_sequence = register_int_gauge_with_registry!(
1114 "lru_latest_sequence",
1115 "Current LRU global sequence",
1116 registry,
1117 )
1118 .unwrap();
1119
1120 let lru_watermark_sequence = register_int_gauge_with_registry!(
1121 "lru_watermark_sequence",
1122 "Current LRU watermark sequence",
1123 registry,
1124 )
1125 .unwrap();
1126
1127 let lru_eviction_policy = register_int_gauge_with_registry!(
1128 "lru_eviction_policy",
1129 "Current LRU eviction policy",
1130 registry,
1131 )
1132 .unwrap();
1133
1134 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1135 "jemalloc_allocated_bytes",
1136 "The allocated memory jemalloc, got from jemalloc_ctl",
1137 registry
1138 )
1139 .unwrap();
1140
1141 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1142 "jemalloc_active_bytes",
1143 "The active memory jemalloc, got from jemalloc_ctl",
1144 registry
1145 )
1146 .unwrap();
1147
1148 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1149 "jemalloc_resident_bytes",
1150 "The active memory jemalloc, got from jemalloc_ctl",
1151 registry
1152 )
1153 .unwrap();
1154
1155 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1156 "jemalloc_metadata_bytes",
1157 "The active memory jemalloc, got from jemalloc_ctl",
1158 registry
1159 )
1160 .unwrap();
1161
1162 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1163 "jvm_allocated_bytes",
1164 "The allocated jvm memory",
1165 registry
1166 )
1167 .unwrap();
1168
1169 let jvm_active_bytes = register_int_gauge_with_registry!(
1170 "jvm_active_bytes",
1171 "The active jvm memory",
1172 registry
1173 )
1174 .unwrap();
1175
1176 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1177 "stream_materialize_cache_hit_count",
1178 "Materialize executor cache hit count",
1179 &["actor_id", "table_id", "fragment_id"],
1180 registry
1181 )
1182 .unwrap()
1183 .relabel_debug_1(level);
1184
1185 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1186 "stream_materialize_data_exist_count",
1187 "Materialize executor data exist count",
1188 &["actor_id", "table_id", "fragment_id"],
1189 registry
1190 )
1191 .unwrap()
1192 .relabel_debug_1(level);
1193
1194 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1195 "stream_materialize_cache_total_count",
1196 "Materialize executor cache total operation",
1197 &["actor_id", "table_id", "fragment_id"],
1198 registry
1199 )
1200 .unwrap()
1201 .relabel_debug_1(level);
1202
1203 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1204 "stream_memory_usage",
1205 "Memory usage for stream executors",
1206 &["actor_id", "table_id", "desc"],
1207 registry
1208 )
1209 .unwrap()
1210 .relabel_debug_1(level);
1211
1212 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1213 "gap_fill_generated_rows_count",
1214 "Total number of rows generated by gap fill executor",
1215 &["actor_id", "fragment_id"],
1216 registry
1217 )
1218 .unwrap()
1219 .relabel_debug_1(level);
1220
1221 Self {
1222 level,
1223 executor_row_count,
1224 mem_stream_node_output_row_count: stream_node_output_row_count,
1225 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1226 actor_execution_time,
1227 actor_scheduled_duration,
1228 actor_scheduled_cnt,
1229 actor_fast_poll_duration,
1230 actor_fast_poll_cnt,
1231 actor_slow_poll_duration,
1232 actor_slow_poll_cnt,
1233 actor_poll_duration,
1234 actor_poll_cnt,
1235 actor_idle_duration,
1236 actor_idle_cnt,
1237 actor_count,
1238 actor_in_record_cnt,
1239 actor_out_record_cnt,
1240 actor_current_epoch,
1241 source_output_row_count,
1242 source_split_change_count,
1243 source_backfill_row_count,
1244 sink_input_row_count,
1245 sink_input_bytes,
1246 sink_chunk_buffer_size,
1247 exchange_frag_recv_size,
1248 merge_barrier_align_duration,
1249 actor_output_buffer_blocking_duration_ns,
1250 actor_input_buffer_blocking_duration_ns,
1251 join_lookup_miss_count,
1252 join_lookup_total_count,
1253 join_insert_cache_miss_count,
1254 join_actor_input_waiting_duration_ns,
1255 join_match_duration_ns,
1256 join_cached_entry_count,
1257 join_matched_join_keys,
1258 barrier_align_duration,
1259 agg_lookup_miss_count,
1260 agg_total_lookup_count,
1261 agg_cached_entry_count,
1262 agg_chunk_lookup_miss_count,
1263 agg_chunk_total_lookup_count,
1264 agg_dirty_groups_count,
1265 agg_dirty_groups_heap_size,
1266 agg_distinct_cache_miss_count,
1267 agg_distinct_total_cache_count,
1268 agg_distinct_cached_entry_count,
1269 agg_state_cache_lookup_count,
1270 agg_state_cache_miss_count,
1271 group_top_n_cache_miss_count,
1272 group_top_n_total_query_cache_count,
1273 group_top_n_cached_entry_count,
1274 group_top_n_appendonly_cache_miss_count,
1275 group_top_n_appendonly_total_query_cache_count,
1276 group_top_n_appendonly_cached_entry_count,
1277 lookup_cache_miss_count,
1278 lookup_total_query_cache_count,
1279 lookup_cached_entry_count,
1280 temporal_join_cache_miss_count,
1281 temporal_join_total_query_cache_count,
1282 temporal_join_cached_entry_count,
1283 backfill_snapshot_read_row_count,
1284 backfill_upstream_output_row_count,
1285 cdc_backfill_snapshot_read_row_count,
1286 cdc_backfill_upstream_output_row_count,
1287 snapshot_backfill_consume_row_count,
1288 over_window_cached_entry_count,
1289 over_window_cache_lookup_count,
1290 over_window_cache_miss_count,
1291 over_window_range_cache_entry_count,
1292 over_window_range_cache_lookup_count,
1293 over_window_range_cache_left_miss_count,
1294 over_window_range_cache_right_miss_count,
1295 over_window_accessed_entry_count,
1296 over_window_compute_count,
1297 over_window_same_output_count,
1298 barrier_inflight_latency,
1299 barrier_sync_latency,
1300 barrier_batch_size,
1301 barrier_manager_progress,
1302 kv_log_store_storage_write_count,
1303 kv_log_store_storage_write_size,
1304 kv_log_store_rewind_count,
1305 kv_log_store_rewind_delay,
1306 kv_log_store_storage_read_count,
1307 kv_log_store_storage_read_size,
1308 kv_log_store_buffer_unconsumed_item_count,
1309 kv_log_store_buffer_unconsumed_row_count,
1310 kv_log_store_buffer_unconsumed_epoch_count,
1311 kv_log_store_buffer_unconsumed_min_epoch,
1312 sync_kv_log_store_read_count,
1313 sync_kv_log_store_read_size,
1314 sync_kv_log_store_write_pause_duration_ns,
1315 sync_kv_log_store_state,
1316 sync_kv_log_store_wait_next_poll_ns,
1317 sync_kv_log_store_storage_write_count,
1318 sync_kv_log_store_storage_write_size,
1319 sync_kv_log_store_buffer_unconsumed_item_count,
1320 sync_kv_log_store_buffer_unconsumed_row_count,
1321 sync_kv_log_store_buffer_unconsumed_epoch_count,
1322 sync_kv_log_store_buffer_unconsumed_min_epoch,
1323 lru_runtime_loop_count,
1324 lru_latest_sequence,
1325 lru_watermark_sequence,
1326 lru_eviction_policy,
1327 jemalloc_allocated_bytes,
1328 jemalloc_active_bytes,
1329 jemalloc_resident_bytes,
1330 jemalloc_metadata_bytes,
1331 jvm_allocated_bytes,
1332 jvm_active_bytes,
1333 stream_memory_usage,
1334 materialize_cache_hit_count,
1335 materialize_data_exist_count,
1336 materialize_cache_total_count,
1337 materialize_input_row_count,
1338 materialize_current_epoch,
1339 pg_cdc_state_table_lsn,
1340 pg_cdc_jni_commit_offset_lsn,
1341 gap_fill_generated_rows_count,
1342 }
1343 }
1344
1345 pub fn unused() -> Self {
1347 global_streaming_metrics(MetricLevel::Disabled)
1348 }
1349
1350 pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1351 let label_list: &[&str; 1] = &[&actor_id.to_string()];
1352 let actor_execution_time = self
1353 .actor_execution_time
1354 .with_guarded_label_values(label_list);
1355 let actor_scheduled_duration = self
1356 .actor_scheduled_duration
1357 .with_guarded_label_values(label_list);
1358 let actor_scheduled_cnt = self
1359 .actor_scheduled_cnt
1360 .with_guarded_label_values(label_list);
1361 let actor_fast_poll_duration = self
1362 .actor_fast_poll_duration
1363 .with_guarded_label_values(label_list);
1364 let actor_fast_poll_cnt = self
1365 .actor_fast_poll_cnt
1366 .with_guarded_label_values(label_list);
1367 let actor_slow_poll_duration = self
1368 .actor_slow_poll_duration
1369 .with_guarded_label_values(label_list);
1370 let actor_slow_poll_cnt = self
1371 .actor_slow_poll_cnt
1372 .with_guarded_label_values(label_list);
1373 let actor_poll_duration = self
1374 .actor_poll_duration
1375 .with_guarded_label_values(label_list);
1376 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1377 let actor_idle_duration = self
1378 .actor_idle_duration
1379 .with_guarded_label_values(label_list);
1380 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1381 ActorMetrics {
1382 actor_execution_time,
1383 actor_scheduled_duration,
1384 actor_scheduled_cnt,
1385 actor_fast_poll_duration,
1386 actor_fast_poll_cnt,
1387 actor_slow_poll_duration,
1388 actor_slow_poll_cnt,
1389 actor_poll_duration,
1390 actor_poll_cnt,
1391 actor_idle_duration,
1392 actor_idle_cnt,
1393 }
1394 }
1395
1396 pub(crate) fn new_actor_input_metrics(
1397 &self,
1398 actor_id: ActorId,
1399 fragment_id: FragmentId,
1400 upstream_fragment_id: FragmentId,
1401 ) -> ActorInputMetrics {
1402 let actor_id_str = actor_id.to_string();
1403 let fragment_id_str = fragment_id.to_string();
1404 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1405 ActorInputMetrics {
1406 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1407 &actor_id_str,
1408 &fragment_id_str,
1409 &upstream_fragment_id_str,
1410 ]),
1411 actor_input_buffer_blocking_duration_ns: self
1412 .actor_input_buffer_blocking_duration_ns
1413 .with_guarded_label_values(&[
1414 &actor_id_str,
1415 &fragment_id_str,
1416 &upstream_fragment_id_str,
1417 ]),
1418 }
1419 }
1420
1421 pub fn new_sink_exec_metrics(
1422 &self,
1423 id: SinkId,
1424 actor_id: ActorId,
1425 fragment_id: FragmentId,
1426 ) -> SinkExecutorMetrics {
1427 let label_list: &[&str; 3] = &[
1428 &id.to_string(),
1429 &actor_id.to_string(),
1430 &fragment_id.to_string(),
1431 ];
1432 SinkExecutorMetrics {
1433 sink_input_row_count: self
1434 .sink_input_row_count
1435 .with_guarded_label_values(label_list),
1436 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1437 sink_chunk_buffer_size: self
1438 .sink_chunk_buffer_size
1439 .with_guarded_label_values(label_list),
1440 }
1441 }
1442
1443 pub fn new_group_top_n_metrics(
1444 &self,
1445 table_id: TableId,
1446 actor_id: ActorId,
1447 fragment_id: FragmentId,
1448 ) -> GroupTopNMetrics {
1449 let label_list: &[&str; 3] = &[
1450 &table_id.to_string(),
1451 &actor_id.to_string(),
1452 &fragment_id.to_string(),
1453 ];
1454
1455 GroupTopNMetrics {
1456 group_top_n_cache_miss_count: self
1457 .group_top_n_cache_miss_count
1458 .with_guarded_label_values(label_list),
1459 group_top_n_total_query_cache_count: self
1460 .group_top_n_total_query_cache_count
1461 .with_guarded_label_values(label_list),
1462 group_top_n_cached_entry_count: self
1463 .group_top_n_cached_entry_count
1464 .with_guarded_label_values(label_list),
1465 }
1466 }
1467
1468 pub fn new_append_only_group_top_n_metrics(
1469 &self,
1470 table_id: TableId,
1471 actor_id: ActorId,
1472 fragment_id: FragmentId,
1473 ) -> GroupTopNMetrics {
1474 let label_list: &[&str; 3] = &[
1475 &table_id.to_string(),
1476 &actor_id.to_string(),
1477 &fragment_id.to_string(),
1478 ];
1479
1480 GroupTopNMetrics {
1481 group_top_n_cache_miss_count: self
1482 .group_top_n_appendonly_cache_miss_count
1483 .with_guarded_label_values(label_list),
1484 group_top_n_total_query_cache_count: self
1485 .group_top_n_appendonly_total_query_cache_count
1486 .with_guarded_label_values(label_list),
1487 group_top_n_cached_entry_count: self
1488 .group_top_n_appendonly_cached_entry_count
1489 .with_guarded_label_values(label_list),
1490 }
1491 }
1492
1493 pub fn new_lookup_executor_metrics(
1494 &self,
1495 table_id: TableId,
1496 actor_id: ActorId,
1497 fragment_id: FragmentId,
1498 ) -> LookupExecutorMetrics {
1499 let label_list: &[&str; 3] = &[
1500 &table_id.to_string(),
1501 &actor_id.to_string(),
1502 &fragment_id.to_string(),
1503 ];
1504
1505 LookupExecutorMetrics {
1506 lookup_cache_miss_count: self
1507 .lookup_cache_miss_count
1508 .with_guarded_label_values(label_list),
1509 lookup_total_query_cache_count: self
1510 .lookup_total_query_cache_count
1511 .with_guarded_label_values(label_list),
1512 lookup_cached_entry_count: self
1513 .lookup_cached_entry_count
1514 .with_guarded_label_values(label_list),
1515 }
1516 }
1517
1518 pub fn new_hash_agg_metrics(
1519 &self,
1520 table_id: TableId,
1521 actor_id: ActorId,
1522 fragment_id: FragmentId,
1523 ) -> HashAggMetrics {
1524 let label_list: &[&str; 3] = &[
1525 &table_id.to_string(),
1526 &actor_id.to_string(),
1527 &fragment_id.to_string(),
1528 ];
1529 HashAggMetrics {
1530 agg_lookup_miss_count: self
1531 .agg_lookup_miss_count
1532 .with_guarded_label_values(label_list),
1533 agg_total_lookup_count: self
1534 .agg_total_lookup_count
1535 .with_guarded_label_values(label_list),
1536 agg_cached_entry_count: self
1537 .agg_cached_entry_count
1538 .with_guarded_label_values(label_list),
1539 agg_chunk_lookup_miss_count: self
1540 .agg_chunk_lookup_miss_count
1541 .with_guarded_label_values(label_list),
1542 agg_chunk_total_lookup_count: self
1543 .agg_chunk_total_lookup_count
1544 .with_guarded_label_values(label_list),
1545 agg_dirty_groups_count: self
1546 .agg_dirty_groups_count
1547 .with_guarded_label_values(label_list),
1548 agg_dirty_groups_heap_size: self
1549 .agg_dirty_groups_heap_size
1550 .with_guarded_label_values(label_list),
1551 agg_state_cache_lookup_count: self
1552 .agg_state_cache_lookup_count
1553 .with_guarded_label_values(label_list),
1554 agg_state_cache_miss_count: self
1555 .agg_state_cache_miss_count
1556 .with_guarded_label_values(label_list),
1557 }
1558 }
1559
1560 pub fn new_agg_distinct_dedup_metrics(
1561 &self,
1562 table_id: TableId,
1563 actor_id: ActorId,
1564 fragment_id: FragmentId,
1565 ) -> AggDistinctDedupMetrics {
1566 let label_list: &[&str; 3] = &[
1567 &table_id.to_string(),
1568 &actor_id.to_string(),
1569 &fragment_id.to_string(),
1570 ];
1571 AggDistinctDedupMetrics {
1572 agg_distinct_cache_miss_count: self
1573 .agg_distinct_cache_miss_count
1574 .with_guarded_label_values(label_list),
1575 agg_distinct_total_cache_count: self
1576 .agg_distinct_total_cache_count
1577 .with_guarded_label_values(label_list),
1578 agg_distinct_cached_entry_count: self
1579 .agg_distinct_cached_entry_count
1580 .with_guarded_label_values(label_list),
1581 }
1582 }
1583
1584 pub fn new_temporal_join_metrics(
1585 &self,
1586 table_id: TableId,
1587 actor_id: ActorId,
1588 fragment_id: FragmentId,
1589 ) -> TemporalJoinMetrics {
1590 let label_list: &[&str; 3] = &[
1591 &table_id.to_string(),
1592 &actor_id.to_string(),
1593 &fragment_id.to_string(),
1594 ];
1595 TemporalJoinMetrics {
1596 temporal_join_cache_miss_count: self
1597 .temporal_join_cache_miss_count
1598 .with_guarded_label_values(label_list),
1599 temporal_join_total_query_cache_count: self
1600 .temporal_join_total_query_cache_count
1601 .with_guarded_label_values(label_list),
1602 temporal_join_cached_entry_count: self
1603 .temporal_join_cached_entry_count
1604 .with_guarded_label_values(label_list),
1605 }
1606 }
1607
1608 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1609 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1610 BackfillMetrics {
1611 backfill_snapshot_read_row_count: self
1612 .backfill_snapshot_read_row_count
1613 .with_guarded_label_values(label_list),
1614 backfill_upstream_output_row_count: self
1615 .backfill_upstream_output_row_count
1616 .with_guarded_label_values(label_list),
1617 }
1618 }
1619
1620 pub fn new_cdc_backfill_metrics(
1621 &self,
1622 table_id: TableId,
1623 actor_id: ActorId,
1624 ) -> CdcBackfillMetrics {
1625 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1626 CdcBackfillMetrics {
1627 cdc_backfill_snapshot_read_row_count: self
1628 .cdc_backfill_snapshot_read_row_count
1629 .with_guarded_label_values(label_list),
1630 cdc_backfill_upstream_output_row_count: self
1631 .cdc_backfill_upstream_output_row_count
1632 .with_guarded_label_values(label_list),
1633 }
1634 }
1635
1636 pub fn new_over_window_metrics(
1637 &self,
1638 table_id: TableId,
1639 actor_id: ActorId,
1640 fragment_id: FragmentId,
1641 ) -> OverWindowMetrics {
1642 let label_list: &[&str; 3] = &[
1643 &table_id.to_string(),
1644 &actor_id.to_string(),
1645 &fragment_id.to_string(),
1646 ];
1647 OverWindowMetrics {
1648 over_window_cached_entry_count: self
1649 .over_window_cached_entry_count
1650 .with_guarded_label_values(label_list),
1651 over_window_cache_lookup_count: self
1652 .over_window_cache_lookup_count
1653 .with_guarded_label_values(label_list),
1654 over_window_cache_miss_count: self
1655 .over_window_cache_miss_count
1656 .with_guarded_label_values(label_list),
1657 over_window_range_cache_entry_count: self
1658 .over_window_range_cache_entry_count
1659 .with_guarded_label_values(label_list),
1660 over_window_range_cache_lookup_count: self
1661 .over_window_range_cache_lookup_count
1662 .with_guarded_label_values(label_list),
1663 over_window_range_cache_left_miss_count: self
1664 .over_window_range_cache_left_miss_count
1665 .with_guarded_label_values(label_list),
1666 over_window_range_cache_right_miss_count: self
1667 .over_window_range_cache_right_miss_count
1668 .with_guarded_label_values(label_list),
1669 over_window_accessed_entry_count: self
1670 .over_window_accessed_entry_count
1671 .with_guarded_label_values(label_list),
1672 over_window_compute_count: self
1673 .over_window_compute_count
1674 .with_guarded_label_values(label_list),
1675 over_window_same_output_count: self
1676 .over_window_same_output_count
1677 .with_guarded_label_values(label_list),
1678 }
1679 }
1680
1681 pub fn new_materialize_metrics(
1682 &self,
1683 table_id: TableId,
1684 actor_id: ActorId,
1685 fragment_id: FragmentId,
1686 ) -> MaterializeMetrics {
1687 let label_list: &[&str; 3] = &[
1688 &actor_id.to_string(),
1689 &table_id.to_string(),
1690 &fragment_id.to_string(),
1691 ];
1692 MaterializeMetrics {
1693 materialize_cache_hit_count: self
1694 .materialize_cache_hit_count
1695 .with_guarded_label_values(label_list),
1696 materialize_data_exist_count: self
1697 .materialize_data_exist_count
1698 .with_guarded_label_values(label_list),
1699 materialize_cache_total_count: self
1700 .materialize_cache_total_count
1701 .with_guarded_label_values(label_list),
1702 materialize_input_row_count: self
1703 .materialize_input_row_count
1704 .with_guarded_label_values(label_list),
1705 materialize_current_epoch: self
1706 .materialize_current_epoch
1707 .with_guarded_label_values(label_list),
1708 }
1709 }
1710}
1711
1712pub(crate) struct ActorInputMetrics {
1713 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1714 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1715}
1716
1717pub struct ActorMetrics {
1719 pub actor_execution_time: LabelGuardedGauge,
1720 pub actor_scheduled_duration: LabelGuardedGauge,
1721 pub actor_scheduled_cnt: LabelGuardedIntGauge,
1722 pub actor_fast_poll_duration: LabelGuardedGauge,
1723 pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1724 pub actor_slow_poll_duration: LabelGuardedGauge,
1725 pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1726 pub actor_poll_duration: LabelGuardedGauge,
1727 pub actor_poll_cnt: LabelGuardedIntGauge,
1728 pub actor_idle_duration: LabelGuardedGauge,
1729 pub actor_idle_cnt: LabelGuardedIntGauge,
1730}
1731
1732pub struct SinkExecutorMetrics {
1733 pub sink_input_row_count: LabelGuardedIntCounter,
1734 pub sink_input_bytes: LabelGuardedIntCounter,
1735 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1736}
1737
1738pub struct MaterializeMetrics {
1739 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1740 pub materialize_data_exist_count: LabelGuardedIntCounter,
1741 pub materialize_cache_total_count: LabelGuardedIntCounter,
1742 pub materialize_input_row_count: LabelGuardedIntCounter,
1743 pub materialize_current_epoch: LabelGuardedIntGauge,
1744}
1745
1746pub struct GroupTopNMetrics {
1747 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1748 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1749 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1750}
1751
1752pub struct LookupExecutorMetrics {
1753 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1754 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1755 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1756}
1757
1758pub struct HashAggMetrics {
1759 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1760 pub agg_total_lookup_count: LabelGuardedIntCounter,
1761 pub agg_cached_entry_count: LabelGuardedIntGauge,
1762 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1763 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1764 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1765 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1766 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1767 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1768}
1769
1770pub struct AggDistinctDedupMetrics {
1771 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1772 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1773 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1774}
1775
1776pub struct TemporalJoinMetrics {
1777 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1778 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1779 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1780}
1781
1782pub struct BackfillMetrics {
1783 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1784 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1785}
1786
1787#[derive(Clone)]
1788pub struct CdcBackfillMetrics {
1789 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1790 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1791}
1792
1793pub struct OverWindowMetrics {
1794 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1795 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1796 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1797 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1798 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1799 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1800 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1801 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1802 pub over_window_compute_count: LabelGuardedIntCounter,
1803 pub over_window_same_output_count: LabelGuardedIntCounter,
1804}