1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26 LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46 pub level: MetricLevel,
47
48 pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51 pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55 pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57 actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59 actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60 actor_poll_duration: RelabeledGuardedIntCounterVec,
61 actor_poll_cnt: RelabeledGuardedIntCounterVec,
62 actor_idle_duration: RelabeledGuardedIntCounterVec,
63 actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65 pub actor_count: LabelGuardedIntGaugeVec,
67 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
70
71 pub source_output_row_count: LabelGuardedIntCounterVec,
73 pub source_split_change_count: LabelGuardedIntCounterVec,
74 pub source_backfill_row_count: LabelGuardedIntCounterVec,
75
76 sink_input_row_count: LabelGuardedIntCounterVec,
78 sink_input_bytes: LabelGuardedIntCounterVec,
79 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
80
81 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
83
84 pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
87
88 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90 actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91
92 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
94 pub join_lookup_total_count: LabelGuardedIntCounterVec,
95 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
96 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
97 pub join_match_duration_ns: LabelGuardedIntCounterVec,
98 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
99 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
100
101 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
103
104 agg_lookup_miss_count: LabelGuardedIntCounterVec,
106 agg_total_lookup_count: LabelGuardedIntCounterVec,
107 agg_cached_entry_count: LabelGuardedIntGaugeVec,
108 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
109 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
110 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
111 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
112 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
113 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
114 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
115 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
116 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
117
118 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
120 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
121 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
122 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
126
127 lookup_cache_miss_count: LabelGuardedIntCounterVec,
129 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
130 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
131
132 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
134 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
135 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
136
137 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
139 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
140
141 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
147
148 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
150 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
151 over_window_cache_miss_count: LabelGuardedIntCounterVec,
152 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
153 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
154 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
155 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
156 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
157 over_window_compute_count: LabelGuardedIntCounterVec,
158 over_window_same_output_count: LabelGuardedIntCounterVec,
159
160 pub barrier_inflight_latency: Histogram,
164 pub barrier_sync_latency: Histogram,
166 pub barrier_batch_size: Histogram,
167 pub barrier_manager_progress: IntCounter,
169
170 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
171 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
172 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
173 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
174 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
177 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
178 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
179 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
180
181 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
182 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
183 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
184 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
185 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
189 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
190 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
191 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
192
193 pub lru_runtime_loop_count: IntCounter,
195 pub lru_latest_sequence: IntGauge,
196 pub lru_watermark_sequence: IntGauge,
197 pub lru_eviction_policy: IntGauge,
198 pub jemalloc_allocated_bytes: IntGauge,
199 pub jemalloc_active_bytes: IntGauge,
200 pub jemalloc_resident_bytes: IntGauge,
201 pub jemalloc_metadata_bytes: IntGauge,
202 pub jvm_allocated_bytes: IntGauge,
203 pub jvm_active_bytes: IntGauge,
204 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
205
206 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
208 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
209 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
210 materialize_input_row_count: RelabeledGuardedIntCounterVec,
211 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
212
213 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
215 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
216
217 pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
219 pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
220
221 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
223}
224
225pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
226
227pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
228 GLOBAL_STREAMING_METRICS
229 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
230 .clone()
231}
232
233impl StreamingMetrics {
234 fn new(registry: &Registry, level: MetricLevel) -> Self {
235 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
236 "stream_executor_row_count",
237 "Total number of rows that have been output from each executor",
238 &["actor_id", "fragment_id", "executor_identity"],
239 registry
240 )
241 .unwrap()
242 .relabel_debug_1(level);
243
244 let stream_node_output_row_count = CountMap::new();
245 let stream_node_output_blocking_duration_ns = CountMap::new();
246
247 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
248 "stream_source_output_rows_counts",
249 "Total number of rows that have been output from source",
250 &["source_id", "source_name", "actor_id", "fragment_id"],
251 registry
252 )
253 .unwrap();
254
255 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
256 "stream_source_split_change_event_count",
257 "Total number of split change events that have been operated by source",
258 &["source_id", "source_name", "actor_id", "fragment_id"],
259 registry
260 )
261 .unwrap();
262
263 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
264 "stream_source_backfill_rows_counts",
265 "Total number of rows that have been backfilled for source",
266 &["source_id", "source_name", "actor_id", "fragment_id"],
267 registry
268 )
269 .unwrap();
270
271 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
272 "stream_sink_input_row_count",
273 "Total number of rows streamed into sink executors",
274 &["sink_id", "actor_id", "fragment_id"],
275 registry
276 )
277 .unwrap();
278
279 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
280 "stream_sink_input_bytes",
281 "Total size of chunks streamed into sink executors",
282 &["sink_id", "actor_id", "fragment_id"],
283 registry
284 )
285 .unwrap();
286
287 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
288 "stream_mview_input_row_count",
289 "Total number of rows streamed into materialize executors",
290 &["actor_id", "table_id", "fragment_id"],
291 registry
292 )
293 .unwrap()
294 .relabel_debug_1(level);
295
296 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
297 "stream_mview_current_epoch",
298 "The current epoch of the materialized executor",
299 &["actor_id", "table_id", "fragment_id"],
300 registry
301 )
302 .unwrap()
303 .relabel_debug_1(level);
304
305 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
306 "stream_pg_cdc_state_table_lsn",
307 "Current LSN value stored in PostgreSQL CDC state table",
308 &["source_id"],
309 registry,
310 )
311 .unwrap();
312
313 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
314 "stream_pg_cdc_jni_commit_offset_lsn",
315 "LSN value when JNI commit offset is called for PostgreSQL CDC",
316 &["source_id"],
317 registry,
318 )
319 .unwrap();
320
321 let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
322 "stream_mysql_cdc_state_binlog_file_seq",
323 "Current binlog file sequence number stored in MySQL CDC state table",
324 &["source_id"],
325 registry,
326 )
327 .unwrap();
328
329 let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
330 "stream_mysql_cdc_state_binlog_position",
331 "Current binlog position stored in MySQL CDC state table",
332 &["source_id"],
333 registry,
334 )
335 .unwrap();
336
337 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
338 "stream_sink_chunk_buffer_size",
339 "Total size of chunks buffered in a barrier",
340 &["sink_id", "actor_id", "fragment_id"],
341 registry
342 )
343 .unwrap();
344 let actor_output_buffer_blocking_duration_ns =
345 register_guarded_int_counter_vec_with_registry!(
346 "stream_actor_output_buffer_blocking_duration_ns",
347 "Total blocking duration (ns) of output buffer",
348 &["actor_id", "fragment_id", "downstream_fragment_id"],
349 registry
350 )
351 .unwrap()
352 .relabel_debug_1(level);
354
355 let actor_input_buffer_blocking_duration_ns =
356 register_guarded_int_counter_vec_with_registry!(
357 "stream_actor_input_buffer_blocking_duration_ns",
358 "Total blocking duration (ns) of input buffer",
359 &["actor_id", "fragment_id", "upstream_fragment_id"],
360 registry
361 )
362 .unwrap()
363 .relabel_debug_1(level);
365
366 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
367 "stream_exchange_frag_recv_size",
368 "Total size of messages that have been received from upstream Fragment",
369 &["up_fragment_id", "down_fragment_id"],
370 registry
371 )
372 .unwrap();
373
374 let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
375 "stream_actor_poll_duration",
376 "tokio's metrics",
377 &["actor_id", "fragment_id"],
378 registry
379 )
380 .unwrap()
381 .relabel_debug_1(level);
382
383 let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
384 "stream_actor_poll_cnt",
385 "tokio's metrics",
386 &["actor_id", "fragment_id"],
387 registry
388 )
389 .unwrap()
390 .relabel_debug_1(level);
391
392 let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
393 "stream_actor_scheduled_duration",
394 "tokio's metrics",
395 &["actor_id", "fragment_id"],
396 registry
397 )
398 .unwrap()
399 .relabel_debug_1(level);
400
401 let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
402 "stream_actor_scheduled_cnt",
403 "tokio's metrics",
404 &["actor_id", "fragment_id"],
405 registry
406 )
407 .unwrap()
408 .relabel_debug_1(level);
409
410 let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
411 "stream_actor_idle_duration",
412 "tokio's metrics",
413 &["actor_id", "fragment_id"],
414 registry
415 )
416 .unwrap()
417 .relabel_debug_1(level);
418
419 let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
420 "stream_actor_idle_cnt",
421 "tokio's metrics",
422 &["actor_id", "fragment_id"],
423 registry
424 )
425 .unwrap()
426 .relabel_debug_1(level);
427
428 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
429 "stream_actor_in_record_cnt",
430 "Total number of rows actor received",
431 &["actor_id", "fragment_id", "upstream_fragment_id"],
432 registry
433 )
434 .unwrap()
435 .relabel_debug_1(level);
436
437 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
438 "stream_actor_out_record_cnt",
439 "Total number of rows actor sent",
440 &["actor_id", "fragment_id"],
441 registry
442 )
443 .unwrap()
444 .relabel_debug_1(level);
445
446 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
447 "stream_actor_current_epoch",
448 "Current epoch of actor",
449 &["actor_id", "fragment_id"],
450 registry
451 )
452 .unwrap()
453 .relabel_debug_1(level);
454
455 let actor_count = register_guarded_int_gauge_vec_with_registry!(
456 "stream_actor_count",
457 "Total number of actors (parallelism)",
458 &["fragment_id"],
459 registry
460 )
461 .unwrap();
462
463 let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
464 "stream_merge_barrier_align_duration_ns",
465 "Total merge barrier alignment duration (ns)",
466 &["actor_id", "fragment_id"],
467 registry
468 )
469 .unwrap()
470 .relabel_debug_1(level);
471
472 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
473 "stream_join_lookup_miss_count",
474 "Join executor lookup miss duration",
475 &["side", "join_table_id", "actor_id", "fragment_id"],
476 registry
477 )
478 .unwrap();
479
480 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
481 "stream_join_lookup_total_count",
482 "Join executor lookup total operation",
483 &["side", "join_table_id", "actor_id", "fragment_id"],
484 registry
485 )
486 .unwrap();
487
488 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
489 "stream_join_insert_cache_miss_count",
490 "Join executor cache miss when insert operation",
491 &["side", "join_table_id", "actor_id", "fragment_id"],
492 registry
493 )
494 .unwrap();
495
496 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
497 "stream_join_actor_input_waiting_duration_ns",
498 "Total waiting duration (ns) of input buffer of join actor",
499 &["actor_id", "fragment_id"],
500 registry
501 )
502 .unwrap();
503
504 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
505 "stream_join_match_duration_ns",
506 "Matching duration for each side",
507 &["actor_id", "fragment_id", "side"],
508 registry
509 )
510 .unwrap();
511
512 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
513 "stream_barrier_align_duration_ns",
514 "Duration of join align barrier",
515 &["actor_id", "fragment_id", "wait_side", "executor"],
516 registry
517 )
518 .unwrap()
519 .relabel_debug_1(level);
520
521 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
522 "stream_join_cached_entry_count",
523 "Number of cached entries in streaming join operators",
524 &["actor_id", "fragment_id", "side"],
525 registry
526 )
527 .unwrap();
528
529 let join_matched_join_keys_opts = histogram_opts!(
530 "stream_join_matched_join_keys",
531 "The number of keys matched in the opposite side",
532 exponential_buckets(16.0, 2.0, 28).unwrap() );
534
535 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
536 join_matched_join_keys_opts,
537 &["actor_id", "fragment_id", "table_id"],
538 registry
539 )
540 .unwrap()
541 .relabel_debug_1(level);
542
543 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
544 "stream_agg_lookup_miss_count",
545 "Aggregation executor lookup miss duration",
546 &["table_id", "actor_id", "fragment_id"],
547 registry
548 )
549 .unwrap();
550
551 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
552 "stream_agg_lookup_total_count",
553 "Aggregation executor lookup total operation",
554 &["table_id", "actor_id", "fragment_id"],
555 registry
556 )
557 .unwrap();
558
559 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
560 "stream_agg_distinct_cache_miss_count",
561 "Aggregation executor dinsinct miss duration",
562 &["table_id", "actor_id", "fragment_id"],
563 registry
564 )
565 .unwrap();
566
567 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
568 "stream_agg_distinct_total_cache_count",
569 "Aggregation executor distinct total operation",
570 &["table_id", "actor_id", "fragment_id"],
571 registry
572 )
573 .unwrap();
574
575 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
576 "stream_agg_distinct_cached_entry_count",
577 "Total entry counts in distinct aggregation executor cache",
578 &["table_id", "actor_id", "fragment_id"],
579 registry
580 )
581 .unwrap();
582
583 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
584 "stream_agg_dirty_groups_count",
585 "Total dirty group counts in aggregation executor",
586 &["table_id", "actor_id", "fragment_id"],
587 registry
588 )
589 .unwrap();
590
591 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
592 "stream_agg_dirty_groups_heap_size",
593 "Total dirty group heap size in aggregation executor",
594 &["table_id", "actor_id", "fragment_id"],
595 registry
596 )
597 .unwrap();
598
599 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
600 "stream_agg_state_cache_lookup_count",
601 "Aggregation executor state cache lookup count",
602 &["table_id", "actor_id", "fragment_id"],
603 registry
604 )
605 .unwrap();
606
607 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
608 "stream_agg_state_cache_miss_count",
609 "Aggregation executor state cache miss count",
610 &["table_id", "actor_id", "fragment_id"],
611 registry
612 )
613 .unwrap();
614
615 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
616 "stream_group_top_n_cache_miss_count",
617 "Group top n executor cache miss count",
618 &["table_id", "actor_id", "fragment_id"],
619 registry
620 )
621 .unwrap();
622
623 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
624 "stream_group_top_n_total_query_cache_count",
625 "Group top n executor query cache total count",
626 &["table_id", "actor_id", "fragment_id"],
627 registry
628 )
629 .unwrap();
630
631 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
632 "stream_group_top_n_cached_entry_count",
633 "Total entry counts in group top n executor cache",
634 &["table_id", "actor_id", "fragment_id"],
635 registry
636 )
637 .unwrap();
638
639 let group_top_n_appendonly_cache_miss_count =
640 register_guarded_int_counter_vec_with_registry!(
641 "stream_group_top_n_appendonly_cache_miss_count",
642 "Group top n appendonly executor cache miss count",
643 &["table_id", "actor_id", "fragment_id"],
644 registry
645 )
646 .unwrap();
647
648 let group_top_n_appendonly_total_query_cache_count =
649 register_guarded_int_counter_vec_with_registry!(
650 "stream_group_top_n_appendonly_total_query_cache_count",
651 "Group top n appendonly executor total cache count",
652 &["table_id", "actor_id", "fragment_id"],
653 registry
654 )
655 .unwrap();
656
657 let group_top_n_appendonly_cached_entry_count =
658 register_guarded_int_gauge_vec_with_registry!(
659 "stream_group_top_n_appendonly_cached_entry_count",
660 "Total entry counts in group top n appendonly executor cache",
661 &["table_id", "actor_id", "fragment_id"],
662 registry
663 )
664 .unwrap();
665
666 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
667 "stream_lookup_cache_miss_count",
668 "Lookup executor cache miss count",
669 &["table_id", "actor_id", "fragment_id"],
670 registry
671 )
672 .unwrap();
673
674 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
675 "stream_lookup_total_query_cache_count",
676 "Lookup executor query cache total count",
677 &["table_id", "actor_id", "fragment_id"],
678 registry
679 )
680 .unwrap();
681
682 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
683 "stream_lookup_cached_entry_count",
684 "Total entry counts in lookup executor cache",
685 &["table_id", "actor_id", "fragment_id"],
686 registry
687 )
688 .unwrap();
689
690 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
691 "stream_temporal_join_cache_miss_count",
692 "Temporal join executor cache miss count",
693 &["table_id", "actor_id", "fragment_id"],
694 registry
695 )
696 .unwrap();
697
698 let temporal_join_total_query_cache_count =
699 register_guarded_int_counter_vec_with_registry!(
700 "stream_temporal_join_total_query_cache_count",
701 "Temporal join executor query cache total count",
702 &["table_id", "actor_id", "fragment_id"],
703 registry
704 )
705 .unwrap();
706
707 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
708 "stream_temporal_join_cached_entry_count",
709 "Total entry count in temporal join executor cache",
710 &["table_id", "actor_id", "fragment_id"],
711 registry
712 )
713 .unwrap();
714
715 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
716 "stream_agg_cached_entry_count",
717 "Number of cached keys in streaming aggregation operators",
718 &["table_id", "actor_id", "fragment_id"],
719 registry
720 )
721 .unwrap();
722
723 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
724 "stream_agg_chunk_lookup_miss_count",
725 "Aggregation executor chunk-level lookup miss duration",
726 &["table_id", "actor_id", "fragment_id"],
727 registry
728 )
729 .unwrap();
730
731 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
732 "stream_agg_chunk_lookup_total_count",
733 "Aggregation executor chunk-level lookup total operation",
734 &["table_id", "actor_id", "fragment_id"],
735 registry
736 )
737 .unwrap();
738
739 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
740 "stream_backfill_snapshot_read_row_count",
741 "Total number of rows that have been read from the backfill snapshot",
742 &["table_id", "actor_id"],
743 registry
744 )
745 .unwrap();
746
747 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
748 "stream_backfill_upstream_output_row_count",
749 "Total number of rows that have been output from the backfill upstream",
750 &["table_id", "actor_id"],
751 registry
752 )
753 .unwrap();
754
755 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
756 "stream_cdc_backfill_snapshot_read_row_count",
757 "Total number of rows that have been read from the cdc_backfill snapshot",
758 &["table_id", "actor_id"],
759 registry
760 )
761 .unwrap();
762
763 let cdc_backfill_upstream_output_row_count =
764 register_guarded_int_counter_vec_with_registry!(
765 "stream_cdc_backfill_upstream_output_row_count",
766 "Total number of rows that have been output from the cdc_backfill upstream",
767 &["table_id", "actor_id"],
768 registry
769 )
770 .unwrap();
771
772 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
773 "stream_snapshot_backfill_consume_snapshot_row_count",
774 "Total number of rows that have been output from snapshot backfill",
775 &["table_id", "actor_id", "stage"],
776 registry
777 )
778 .unwrap();
779
780 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
781 "stream_over_window_cached_entry_count",
782 "Total entry (partition) count in over window executor cache",
783 &["table_id", "actor_id", "fragment_id"],
784 registry
785 )
786 .unwrap();
787
788 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
789 "stream_over_window_cache_lookup_count",
790 "Over window executor cache lookup count",
791 &["table_id", "actor_id", "fragment_id"],
792 registry
793 )
794 .unwrap();
795
796 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
797 "stream_over_window_cache_miss_count",
798 "Over window executor cache miss count",
799 &["table_id", "actor_id", "fragment_id"],
800 registry
801 )
802 .unwrap();
803
804 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
805 "stream_over_window_range_cache_entry_count",
806 "Over window partition range cache entry count",
807 &["table_id", "actor_id", "fragment_id"],
808 registry,
809 )
810 .unwrap();
811
812 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
813 "stream_over_window_range_cache_lookup_count",
814 "Over window partition range cache lookup count",
815 &["table_id", "actor_id", "fragment_id"],
816 registry
817 )
818 .unwrap();
819
820 let over_window_range_cache_left_miss_count =
821 register_guarded_int_counter_vec_with_registry!(
822 "stream_over_window_range_cache_left_miss_count",
823 "Over window partition range cache left miss count",
824 &["table_id", "actor_id", "fragment_id"],
825 registry
826 )
827 .unwrap();
828
829 let over_window_range_cache_right_miss_count =
830 register_guarded_int_counter_vec_with_registry!(
831 "stream_over_window_range_cache_right_miss_count",
832 "Over window partition range cache right miss count",
833 &["table_id", "actor_id", "fragment_id"],
834 registry
835 )
836 .unwrap();
837
838 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
839 "stream_over_window_accessed_entry_count",
840 "Over window accessed entry count",
841 &["table_id", "actor_id", "fragment_id"],
842 registry
843 )
844 .unwrap();
845
846 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
847 "stream_over_window_compute_count",
848 "Over window compute count",
849 &["table_id", "actor_id", "fragment_id"],
850 registry
851 )
852 .unwrap();
853
854 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
855 "stream_over_window_same_output_count",
856 "Over window same output count",
857 &["table_id", "actor_id", "fragment_id"],
858 registry
859 )
860 .unwrap();
861
862 let opts = histogram_opts!(
863 "stream_barrier_inflight_duration_seconds",
864 "barrier_inflight_latency",
865 exponential_buckets(0.1, 1.5, 16).unwrap() );
867 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
868
869 let opts = histogram_opts!(
870 "stream_barrier_sync_storage_duration_seconds",
871 "barrier_sync_latency",
872 exponential_buckets(0.1, 1.5, 16).unwrap() );
874 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
875
876 let opts = histogram_opts!(
877 "stream_barrier_batch_size",
878 "barrier_batch_size",
879 exponential_buckets(1.0, 2.0, 8).unwrap()
880 );
881 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
882
883 let barrier_manager_progress = register_int_counter_with_registry!(
884 "stream_barrier_manager_progress",
885 "The number of actors that have processed the earliest in-flight barriers",
886 registry
887 )
888 .unwrap();
889
890 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
891 "sync_kv_log_store_wait_next_poll_ns",
892 "Total duration (ns) of waiting for next poll",
893 &["actor_id", "target", "fragment_id", "relation"],
894 registry
895 )
896 .unwrap();
897
898 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
899 "sync_kv_log_store_read_count",
900 "read row count throughput of sync_kv log store",
901 &["type", "actor_id", "target", "fragment_id", "relation"],
902 registry
903 )
904 .unwrap();
905
906 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
907 "sync_kv_log_store_read_size",
908 "read size throughput of sync_kv log store",
909 &["type", "actor_id", "target", "fragment_id", "relation"],
910 registry
911 )
912 .unwrap();
913
914 let sync_kv_log_store_write_pause_duration_ns =
915 register_guarded_int_counter_vec_with_registry!(
916 "sync_kv_log_store_write_pause_duration_ns",
917 "Duration (ns) of sync_kv log store write pause",
918 &["actor_id", "target", "fragment_id", "relation"],
919 registry
920 )
921 .unwrap();
922
923 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
924 "sync_kv_log_store_state",
925 "clean/unclean state transition for sync_kv log store",
926 &["state", "actor_id", "target", "fragment_id", "relation"],
927 registry
928 )
929 .unwrap();
930
931 let sync_kv_log_store_storage_write_count =
932 register_guarded_int_counter_vec_with_registry!(
933 "sync_kv_log_store_storage_write_count",
934 "Write row count throughput of sync_kv log store",
935 &["actor_id", "target", "fragment_id", "relation"],
936 registry
937 )
938 .unwrap();
939
940 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
941 "sync_kv_log_store_storage_write_size",
942 "Write size throughput of sync_kv log store",
943 &["actor_id", "target", "fragment_id", "relation"],
944 registry
945 )
946 .unwrap();
947
948 let sync_kv_log_store_buffer_unconsumed_item_count =
949 register_guarded_int_gauge_vec_with_registry!(
950 "sync_kv_log_store_buffer_unconsumed_item_count",
951 "Number of Unconsumed Item in buffer",
952 &["actor_id", "target", "fragment_id", "relation"],
953 registry
954 )
955 .unwrap();
956
957 let sync_kv_log_store_buffer_unconsumed_row_count =
958 register_guarded_int_gauge_vec_with_registry!(
959 "sync_kv_log_store_buffer_unconsumed_row_count",
960 "Number of Unconsumed Row in buffer",
961 &["actor_id", "target", "fragment_id", "relation"],
962 registry
963 )
964 .unwrap();
965
966 let sync_kv_log_store_buffer_unconsumed_epoch_count =
967 register_guarded_int_gauge_vec_with_registry!(
968 "sync_kv_log_store_buffer_unconsumed_epoch_count",
969 "Number of Unconsumed Epoch in buffer",
970 &["actor_id", "target", "fragment_id", "relation"],
971 registry
972 )
973 .unwrap();
974
975 let sync_kv_log_store_buffer_unconsumed_min_epoch =
976 register_guarded_int_gauge_vec_with_registry!(
977 "sync_kv_log_store_buffer_unconsumed_min_epoch",
978 "Number of Unconsumed Epoch in buffer",
979 &["actor_id", "target", "fragment_id", "relation"],
980 registry
981 )
982 .unwrap();
983
984 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
985 "kv_log_store_storage_write_count",
986 "Write row count throughput of kv log store",
987 &["actor_id", "connector", "sink_id", "sink_name"],
988 registry
989 )
990 .unwrap();
991
992 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
993 "kv_log_store_storage_write_size",
994 "Write size throughput of kv log store",
995 &["actor_id", "connector", "sink_id", "sink_name"],
996 registry
997 )
998 .unwrap();
999
1000 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1001 "kv_log_store_storage_read_count",
1002 "Write row count throughput of kv log store",
1003 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1004 registry
1005 )
1006 .unwrap();
1007
1008 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1009 "kv_log_store_storage_read_size",
1010 "Write size throughput of kv log store",
1011 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1012 registry
1013 )
1014 .unwrap();
1015
1016 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1017 "kv_log_store_rewind_count",
1018 "Kv log store rewind rate",
1019 &["actor_id", "connector", "sink_id", "sink_name"],
1020 registry
1021 )
1022 .unwrap();
1023
1024 let kv_log_store_rewind_delay_opts = {
1025 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1026 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1027 - REWIND_BASE_DELAY.as_secs_f64().log2())
1028 .ceil() as usize;
1029 let buckets = exponential_buckets(
1030 REWIND_BASE_DELAY.as_secs_f64(),
1031 REWIND_BACKOFF_FACTOR as _,
1032 bucket_count,
1033 )
1034 .unwrap();
1035 histogram_opts!(
1036 "kv_log_store_rewind_delay",
1037 "Kv log store rewind delay",
1038 buckets,
1039 )
1040 };
1041
1042 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1043 kv_log_store_rewind_delay_opts,
1044 &["actor_id", "connector", "sink_id", "sink_name"],
1045 registry
1046 )
1047 .unwrap();
1048
1049 let kv_log_store_buffer_unconsumed_item_count =
1050 register_guarded_int_gauge_vec_with_registry!(
1051 "kv_log_store_buffer_unconsumed_item_count",
1052 "Number of Unconsumed Item in buffer",
1053 &["actor_id", "connector", "sink_id", "sink_name"],
1054 registry
1055 )
1056 .unwrap();
1057
1058 let kv_log_store_buffer_unconsumed_row_count =
1059 register_guarded_int_gauge_vec_with_registry!(
1060 "kv_log_store_buffer_unconsumed_row_count",
1061 "Number of Unconsumed Row in buffer",
1062 &["actor_id", "connector", "sink_id", "sink_name"],
1063 registry
1064 )
1065 .unwrap();
1066
1067 let kv_log_store_buffer_unconsumed_epoch_count =
1068 register_guarded_int_gauge_vec_with_registry!(
1069 "kv_log_store_buffer_unconsumed_epoch_count",
1070 "Number of Unconsumed Epoch in buffer",
1071 &["actor_id", "connector", "sink_id", "sink_name"],
1072 registry
1073 )
1074 .unwrap();
1075
1076 let kv_log_store_buffer_unconsumed_min_epoch =
1077 register_guarded_int_gauge_vec_with_registry!(
1078 "kv_log_store_buffer_unconsumed_min_epoch",
1079 "Number of Unconsumed Epoch in buffer",
1080 &["actor_id", "connector", "sink_id", "sink_name"],
1081 registry
1082 )
1083 .unwrap();
1084
1085 let lru_runtime_loop_count = register_int_counter_with_registry!(
1086 "lru_runtime_loop_count",
1087 "The counts of the eviction loop in LRU manager per second",
1088 registry
1089 )
1090 .unwrap();
1091
1092 let lru_latest_sequence = register_int_gauge_with_registry!(
1093 "lru_latest_sequence",
1094 "Current LRU global sequence",
1095 registry,
1096 )
1097 .unwrap();
1098
1099 let lru_watermark_sequence = register_int_gauge_with_registry!(
1100 "lru_watermark_sequence",
1101 "Current LRU watermark sequence",
1102 registry,
1103 )
1104 .unwrap();
1105
1106 let lru_eviction_policy = register_int_gauge_with_registry!(
1107 "lru_eviction_policy",
1108 "Current LRU eviction policy",
1109 registry,
1110 )
1111 .unwrap();
1112
1113 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1114 "jemalloc_allocated_bytes",
1115 "The allocated memory jemalloc, got from jemalloc_ctl",
1116 registry
1117 )
1118 .unwrap();
1119
1120 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1121 "jemalloc_active_bytes",
1122 "The active memory jemalloc, got from jemalloc_ctl",
1123 registry
1124 )
1125 .unwrap();
1126
1127 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1128 "jemalloc_resident_bytes",
1129 "The active memory jemalloc, got from jemalloc_ctl",
1130 registry
1131 )
1132 .unwrap();
1133
1134 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1135 "jemalloc_metadata_bytes",
1136 "The active memory jemalloc, got from jemalloc_ctl",
1137 registry
1138 )
1139 .unwrap();
1140
1141 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1142 "jvm_allocated_bytes",
1143 "The allocated jvm memory",
1144 registry
1145 )
1146 .unwrap();
1147
1148 let jvm_active_bytes = register_int_gauge_with_registry!(
1149 "jvm_active_bytes",
1150 "The active jvm memory",
1151 registry
1152 )
1153 .unwrap();
1154
1155 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1156 "stream_materialize_cache_hit_count",
1157 "Materialize executor cache hit count",
1158 &["actor_id", "table_id", "fragment_id"],
1159 registry
1160 )
1161 .unwrap()
1162 .relabel_debug_1(level);
1163
1164 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1165 "stream_materialize_data_exist_count",
1166 "Materialize executor data exist count",
1167 &["actor_id", "table_id", "fragment_id"],
1168 registry
1169 )
1170 .unwrap()
1171 .relabel_debug_1(level);
1172
1173 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1174 "stream_materialize_cache_total_count",
1175 "Materialize executor cache total operation",
1176 &["actor_id", "table_id", "fragment_id"],
1177 registry
1178 )
1179 .unwrap()
1180 .relabel_debug_1(level);
1181
1182 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1183 "stream_memory_usage",
1184 "Memory usage for stream executors",
1185 &["actor_id", "table_id", "desc"],
1186 registry
1187 )
1188 .unwrap()
1189 .relabel_debug_1(level);
1190
1191 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1192 "gap_fill_generated_rows_count",
1193 "Total number of rows generated by gap fill executor",
1194 &["actor_id", "fragment_id"],
1195 registry
1196 )
1197 .unwrap()
1198 .relabel_debug_1(level);
1199
1200 Self {
1201 level,
1202 executor_row_count,
1203 mem_stream_node_output_row_count: stream_node_output_row_count,
1204 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1205 actor_scheduled_duration,
1206 actor_scheduled_cnt,
1207 actor_poll_duration,
1208 actor_poll_cnt,
1209 actor_idle_duration,
1210 actor_idle_cnt,
1211 actor_count,
1212 actor_in_record_cnt,
1213 actor_out_record_cnt,
1214 actor_current_epoch,
1215 source_output_row_count,
1216 source_split_change_count,
1217 source_backfill_row_count,
1218 sink_input_row_count,
1219 sink_input_bytes,
1220 sink_chunk_buffer_size,
1221 exchange_frag_recv_size,
1222 merge_barrier_align_duration,
1223 actor_output_buffer_blocking_duration_ns,
1224 actor_input_buffer_blocking_duration_ns,
1225 join_lookup_miss_count,
1226 join_lookup_total_count,
1227 join_insert_cache_miss_count,
1228 join_actor_input_waiting_duration_ns,
1229 join_match_duration_ns,
1230 join_cached_entry_count,
1231 join_matched_join_keys,
1232 barrier_align_duration,
1233 agg_lookup_miss_count,
1234 agg_total_lookup_count,
1235 agg_cached_entry_count,
1236 agg_chunk_lookup_miss_count,
1237 agg_chunk_total_lookup_count,
1238 agg_dirty_groups_count,
1239 agg_dirty_groups_heap_size,
1240 agg_distinct_cache_miss_count,
1241 agg_distinct_total_cache_count,
1242 agg_distinct_cached_entry_count,
1243 agg_state_cache_lookup_count,
1244 agg_state_cache_miss_count,
1245 group_top_n_cache_miss_count,
1246 group_top_n_total_query_cache_count,
1247 group_top_n_cached_entry_count,
1248 group_top_n_appendonly_cache_miss_count,
1249 group_top_n_appendonly_total_query_cache_count,
1250 group_top_n_appendonly_cached_entry_count,
1251 lookup_cache_miss_count,
1252 lookup_total_query_cache_count,
1253 lookup_cached_entry_count,
1254 temporal_join_cache_miss_count,
1255 temporal_join_total_query_cache_count,
1256 temporal_join_cached_entry_count,
1257 backfill_snapshot_read_row_count,
1258 backfill_upstream_output_row_count,
1259 cdc_backfill_snapshot_read_row_count,
1260 cdc_backfill_upstream_output_row_count,
1261 snapshot_backfill_consume_row_count,
1262 over_window_cached_entry_count,
1263 over_window_cache_lookup_count,
1264 over_window_cache_miss_count,
1265 over_window_range_cache_entry_count,
1266 over_window_range_cache_lookup_count,
1267 over_window_range_cache_left_miss_count,
1268 over_window_range_cache_right_miss_count,
1269 over_window_accessed_entry_count,
1270 over_window_compute_count,
1271 over_window_same_output_count,
1272 barrier_inflight_latency,
1273 barrier_sync_latency,
1274 barrier_batch_size,
1275 barrier_manager_progress,
1276 kv_log_store_storage_write_count,
1277 kv_log_store_storage_write_size,
1278 kv_log_store_rewind_count,
1279 kv_log_store_rewind_delay,
1280 kv_log_store_storage_read_count,
1281 kv_log_store_storage_read_size,
1282 kv_log_store_buffer_unconsumed_item_count,
1283 kv_log_store_buffer_unconsumed_row_count,
1284 kv_log_store_buffer_unconsumed_epoch_count,
1285 kv_log_store_buffer_unconsumed_min_epoch,
1286 sync_kv_log_store_read_count,
1287 sync_kv_log_store_read_size,
1288 sync_kv_log_store_write_pause_duration_ns,
1289 sync_kv_log_store_state,
1290 sync_kv_log_store_wait_next_poll_ns,
1291 sync_kv_log_store_storage_write_count,
1292 sync_kv_log_store_storage_write_size,
1293 sync_kv_log_store_buffer_unconsumed_item_count,
1294 sync_kv_log_store_buffer_unconsumed_row_count,
1295 sync_kv_log_store_buffer_unconsumed_epoch_count,
1296 sync_kv_log_store_buffer_unconsumed_min_epoch,
1297 lru_runtime_loop_count,
1298 lru_latest_sequence,
1299 lru_watermark_sequence,
1300 lru_eviction_policy,
1301 jemalloc_allocated_bytes,
1302 jemalloc_active_bytes,
1303 jemalloc_resident_bytes,
1304 jemalloc_metadata_bytes,
1305 jvm_allocated_bytes,
1306 jvm_active_bytes,
1307 stream_memory_usage,
1308 materialize_cache_hit_count,
1309 materialize_data_exist_count,
1310 materialize_cache_total_count,
1311 materialize_input_row_count,
1312 materialize_current_epoch,
1313 pg_cdc_state_table_lsn,
1314 pg_cdc_jni_commit_offset_lsn,
1315 mysql_cdc_state_binlog_file_seq,
1316 mysql_cdc_state_binlog_position,
1317 gap_fill_generated_rows_count,
1318 }
1319 }
1320
1321 pub fn unused() -> Self {
1323 global_streaming_metrics(MetricLevel::Disabled)
1324 }
1325
1326 pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1327 let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1328 let actor_scheduled_duration = self
1329 .actor_scheduled_duration
1330 .with_guarded_label_values(label_list);
1331 let actor_scheduled_cnt = self
1332 .actor_scheduled_cnt
1333 .with_guarded_label_values(label_list);
1334 let actor_poll_duration = self
1335 .actor_poll_duration
1336 .with_guarded_label_values(label_list);
1337 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1338 let actor_idle_duration = self
1339 .actor_idle_duration
1340 .with_guarded_label_values(label_list);
1341 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1342 ActorMetrics {
1343 actor_scheduled_duration,
1344 actor_scheduled_cnt,
1345 actor_poll_duration,
1346 actor_poll_cnt,
1347 actor_idle_duration,
1348 actor_idle_cnt,
1349 }
1350 }
1351
1352 pub(crate) fn new_actor_input_metrics(
1353 &self,
1354 actor_id: ActorId,
1355 fragment_id: FragmentId,
1356 upstream_fragment_id: FragmentId,
1357 ) -> ActorInputMetrics {
1358 let actor_id_str = actor_id.to_string();
1359 let fragment_id_str = fragment_id.to_string();
1360 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1361 ActorInputMetrics {
1362 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1363 &actor_id_str,
1364 &fragment_id_str,
1365 &upstream_fragment_id_str,
1366 ]),
1367 actor_input_buffer_blocking_duration_ns: self
1368 .actor_input_buffer_blocking_duration_ns
1369 .with_guarded_label_values(&[
1370 &actor_id_str,
1371 &fragment_id_str,
1372 &upstream_fragment_id_str,
1373 ]),
1374 }
1375 }
1376
1377 pub fn new_sink_exec_metrics(
1378 &self,
1379 id: SinkId,
1380 actor_id: ActorId,
1381 fragment_id: FragmentId,
1382 ) -> SinkExecutorMetrics {
1383 let label_list: &[&str; 3] = &[
1384 &id.to_string(),
1385 &actor_id.to_string(),
1386 &fragment_id.to_string(),
1387 ];
1388 SinkExecutorMetrics {
1389 sink_input_row_count: self
1390 .sink_input_row_count
1391 .with_guarded_label_values(label_list),
1392 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1393 sink_chunk_buffer_size: self
1394 .sink_chunk_buffer_size
1395 .with_guarded_label_values(label_list),
1396 }
1397 }
1398
1399 pub fn new_group_top_n_metrics(
1400 &self,
1401 table_id: TableId,
1402 actor_id: ActorId,
1403 fragment_id: FragmentId,
1404 ) -> GroupTopNMetrics {
1405 let label_list: &[&str; 3] = &[
1406 &table_id.to_string(),
1407 &actor_id.to_string(),
1408 &fragment_id.to_string(),
1409 ];
1410
1411 GroupTopNMetrics {
1412 group_top_n_cache_miss_count: self
1413 .group_top_n_cache_miss_count
1414 .with_guarded_label_values(label_list),
1415 group_top_n_total_query_cache_count: self
1416 .group_top_n_total_query_cache_count
1417 .with_guarded_label_values(label_list),
1418 group_top_n_cached_entry_count: self
1419 .group_top_n_cached_entry_count
1420 .with_guarded_label_values(label_list),
1421 }
1422 }
1423
1424 pub fn new_append_only_group_top_n_metrics(
1425 &self,
1426 table_id: TableId,
1427 actor_id: ActorId,
1428 fragment_id: FragmentId,
1429 ) -> GroupTopNMetrics {
1430 let label_list: &[&str; 3] = &[
1431 &table_id.to_string(),
1432 &actor_id.to_string(),
1433 &fragment_id.to_string(),
1434 ];
1435
1436 GroupTopNMetrics {
1437 group_top_n_cache_miss_count: self
1438 .group_top_n_appendonly_cache_miss_count
1439 .with_guarded_label_values(label_list),
1440 group_top_n_total_query_cache_count: self
1441 .group_top_n_appendonly_total_query_cache_count
1442 .with_guarded_label_values(label_list),
1443 group_top_n_cached_entry_count: self
1444 .group_top_n_appendonly_cached_entry_count
1445 .with_guarded_label_values(label_list),
1446 }
1447 }
1448
1449 pub fn new_lookup_executor_metrics(
1450 &self,
1451 table_id: TableId,
1452 actor_id: ActorId,
1453 fragment_id: FragmentId,
1454 ) -> LookupExecutorMetrics {
1455 let label_list: &[&str; 3] = &[
1456 &table_id.to_string(),
1457 &actor_id.to_string(),
1458 &fragment_id.to_string(),
1459 ];
1460
1461 LookupExecutorMetrics {
1462 lookup_cache_miss_count: self
1463 .lookup_cache_miss_count
1464 .with_guarded_label_values(label_list),
1465 lookup_total_query_cache_count: self
1466 .lookup_total_query_cache_count
1467 .with_guarded_label_values(label_list),
1468 lookup_cached_entry_count: self
1469 .lookup_cached_entry_count
1470 .with_guarded_label_values(label_list),
1471 }
1472 }
1473
1474 pub fn new_hash_agg_metrics(
1475 &self,
1476 table_id: TableId,
1477 actor_id: ActorId,
1478 fragment_id: FragmentId,
1479 ) -> HashAggMetrics {
1480 let label_list: &[&str; 3] = &[
1481 &table_id.to_string(),
1482 &actor_id.to_string(),
1483 &fragment_id.to_string(),
1484 ];
1485 HashAggMetrics {
1486 agg_lookup_miss_count: self
1487 .agg_lookup_miss_count
1488 .with_guarded_label_values(label_list),
1489 agg_total_lookup_count: self
1490 .agg_total_lookup_count
1491 .with_guarded_label_values(label_list),
1492 agg_cached_entry_count: self
1493 .agg_cached_entry_count
1494 .with_guarded_label_values(label_list),
1495 agg_chunk_lookup_miss_count: self
1496 .agg_chunk_lookup_miss_count
1497 .with_guarded_label_values(label_list),
1498 agg_chunk_total_lookup_count: self
1499 .agg_chunk_total_lookup_count
1500 .with_guarded_label_values(label_list),
1501 agg_dirty_groups_count: self
1502 .agg_dirty_groups_count
1503 .with_guarded_label_values(label_list),
1504 agg_dirty_groups_heap_size: self
1505 .agg_dirty_groups_heap_size
1506 .with_guarded_label_values(label_list),
1507 agg_state_cache_lookup_count: self
1508 .agg_state_cache_lookup_count
1509 .with_guarded_label_values(label_list),
1510 agg_state_cache_miss_count: self
1511 .agg_state_cache_miss_count
1512 .with_guarded_label_values(label_list),
1513 }
1514 }
1515
1516 pub fn new_agg_distinct_dedup_metrics(
1517 &self,
1518 table_id: TableId,
1519 actor_id: ActorId,
1520 fragment_id: FragmentId,
1521 ) -> AggDistinctDedupMetrics {
1522 let label_list: &[&str; 3] = &[
1523 &table_id.to_string(),
1524 &actor_id.to_string(),
1525 &fragment_id.to_string(),
1526 ];
1527 AggDistinctDedupMetrics {
1528 agg_distinct_cache_miss_count: self
1529 .agg_distinct_cache_miss_count
1530 .with_guarded_label_values(label_list),
1531 agg_distinct_total_cache_count: self
1532 .agg_distinct_total_cache_count
1533 .with_guarded_label_values(label_list),
1534 agg_distinct_cached_entry_count: self
1535 .agg_distinct_cached_entry_count
1536 .with_guarded_label_values(label_list),
1537 }
1538 }
1539
1540 pub fn new_temporal_join_metrics(
1541 &self,
1542 table_id: TableId,
1543 actor_id: ActorId,
1544 fragment_id: FragmentId,
1545 ) -> TemporalJoinMetrics {
1546 let label_list: &[&str; 3] = &[
1547 &table_id.to_string(),
1548 &actor_id.to_string(),
1549 &fragment_id.to_string(),
1550 ];
1551 TemporalJoinMetrics {
1552 temporal_join_cache_miss_count: self
1553 .temporal_join_cache_miss_count
1554 .with_guarded_label_values(label_list),
1555 temporal_join_total_query_cache_count: self
1556 .temporal_join_total_query_cache_count
1557 .with_guarded_label_values(label_list),
1558 temporal_join_cached_entry_count: self
1559 .temporal_join_cached_entry_count
1560 .with_guarded_label_values(label_list),
1561 }
1562 }
1563
1564 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1565 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1566 BackfillMetrics {
1567 backfill_snapshot_read_row_count: self
1568 .backfill_snapshot_read_row_count
1569 .with_guarded_label_values(label_list),
1570 backfill_upstream_output_row_count: self
1571 .backfill_upstream_output_row_count
1572 .with_guarded_label_values(label_list),
1573 }
1574 }
1575
1576 pub fn new_cdc_backfill_metrics(
1577 &self,
1578 table_id: TableId,
1579 actor_id: ActorId,
1580 ) -> CdcBackfillMetrics {
1581 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1582 CdcBackfillMetrics {
1583 cdc_backfill_snapshot_read_row_count: self
1584 .cdc_backfill_snapshot_read_row_count
1585 .with_guarded_label_values(label_list),
1586 cdc_backfill_upstream_output_row_count: self
1587 .cdc_backfill_upstream_output_row_count
1588 .with_guarded_label_values(label_list),
1589 }
1590 }
1591
1592 pub fn new_over_window_metrics(
1593 &self,
1594 table_id: TableId,
1595 actor_id: ActorId,
1596 fragment_id: FragmentId,
1597 ) -> OverWindowMetrics {
1598 let label_list: &[&str; 3] = &[
1599 &table_id.to_string(),
1600 &actor_id.to_string(),
1601 &fragment_id.to_string(),
1602 ];
1603 OverWindowMetrics {
1604 over_window_cached_entry_count: self
1605 .over_window_cached_entry_count
1606 .with_guarded_label_values(label_list),
1607 over_window_cache_lookup_count: self
1608 .over_window_cache_lookup_count
1609 .with_guarded_label_values(label_list),
1610 over_window_cache_miss_count: self
1611 .over_window_cache_miss_count
1612 .with_guarded_label_values(label_list),
1613 over_window_range_cache_entry_count: self
1614 .over_window_range_cache_entry_count
1615 .with_guarded_label_values(label_list),
1616 over_window_range_cache_lookup_count: self
1617 .over_window_range_cache_lookup_count
1618 .with_guarded_label_values(label_list),
1619 over_window_range_cache_left_miss_count: self
1620 .over_window_range_cache_left_miss_count
1621 .with_guarded_label_values(label_list),
1622 over_window_range_cache_right_miss_count: self
1623 .over_window_range_cache_right_miss_count
1624 .with_guarded_label_values(label_list),
1625 over_window_accessed_entry_count: self
1626 .over_window_accessed_entry_count
1627 .with_guarded_label_values(label_list),
1628 over_window_compute_count: self
1629 .over_window_compute_count
1630 .with_guarded_label_values(label_list),
1631 over_window_same_output_count: self
1632 .over_window_same_output_count
1633 .with_guarded_label_values(label_list),
1634 }
1635 }
1636
1637 pub fn new_materialize_cache_metrics(
1638 &self,
1639 table_id: TableId,
1640 actor_id: ActorId,
1641 fragment_id: FragmentId,
1642 ) -> MaterializeCacheMetrics {
1643 let label_list: &[&str; 3] = &[
1644 &actor_id.to_string(),
1645 &table_id.to_string(),
1646 &fragment_id.to_string(),
1647 ];
1648 MaterializeCacheMetrics {
1649 materialize_cache_hit_count: self
1650 .materialize_cache_hit_count
1651 .with_guarded_label_values(label_list),
1652 materialize_data_exist_count: self
1653 .materialize_data_exist_count
1654 .with_guarded_label_values(label_list),
1655 materialize_cache_total_count: self
1656 .materialize_cache_total_count
1657 .with_guarded_label_values(label_list),
1658 }
1659 }
1660
1661 pub fn new_materialize_metrics(
1662 &self,
1663 table_id: TableId,
1664 actor_id: ActorId,
1665 fragment_id: FragmentId,
1666 ) -> MaterializeMetrics {
1667 let label_list: &[&str; 3] = &[
1668 &actor_id.to_string(),
1669 &table_id.to_string(),
1670 &fragment_id.to_string(),
1671 ];
1672 MaterializeMetrics {
1673 materialize_input_row_count: self
1674 .materialize_input_row_count
1675 .with_guarded_label_values(label_list),
1676 materialize_current_epoch: self
1677 .materialize_current_epoch
1678 .with_guarded_label_values(label_list),
1679 }
1680 }
1681}
1682
1683pub(crate) struct ActorInputMetrics {
1684 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1685 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1686}
1687
1688pub struct ActorMetrics {
1690 pub actor_scheduled_duration: LabelGuardedIntCounter,
1691 pub actor_scheduled_cnt: LabelGuardedIntCounter,
1692 pub actor_poll_duration: LabelGuardedIntCounter,
1693 pub actor_poll_cnt: LabelGuardedIntCounter,
1694 pub actor_idle_duration: LabelGuardedIntCounter,
1695 pub actor_idle_cnt: LabelGuardedIntCounter,
1696}
1697
1698pub struct SinkExecutorMetrics {
1699 pub sink_input_row_count: LabelGuardedIntCounter,
1700 pub sink_input_bytes: LabelGuardedIntCounter,
1701 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1702}
1703
1704pub struct MaterializeCacheMetrics {
1705 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1706 pub materialize_data_exist_count: LabelGuardedIntCounter,
1707 pub materialize_cache_total_count: LabelGuardedIntCounter,
1708}
1709
1710pub struct MaterializeMetrics {
1711 pub materialize_input_row_count: LabelGuardedIntCounter,
1712 pub materialize_current_epoch: LabelGuardedIntGauge,
1713}
1714
1715pub struct GroupTopNMetrics {
1716 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1717 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1718 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1719}
1720
1721pub struct LookupExecutorMetrics {
1722 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1723 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1724 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1725}
1726
1727pub struct HashAggMetrics {
1728 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1729 pub agg_total_lookup_count: LabelGuardedIntCounter,
1730 pub agg_cached_entry_count: LabelGuardedIntGauge,
1731 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1732 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1733 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1734 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1735 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1736 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1737}
1738
1739pub struct AggDistinctDedupMetrics {
1740 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1741 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1742 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1743}
1744
1745pub struct TemporalJoinMetrics {
1746 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1747 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1748 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1749}
1750
1751pub struct BackfillMetrics {
1752 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1753 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1754}
1755
1756#[derive(Clone)]
1757pub struct CdcBackfillMetrics {
1758 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1759 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1760}
1761
1762pub struct OverWindowMetrics {
1763 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1764 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1765 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1766 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1767 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1768 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1769 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1770 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1771 pub over_window_compute_count: LabelGuardedIntCounter,
1772 pub over_window_same_output_count: LabelGuardedIntCounter,
1773}