1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26 LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46 pub level: MetricLevel,
47
48 pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51 pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55 pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57 actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59 actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60 actor_poll_duration: RelabeledGuardedIntCounterVec,
61 actor_poll_cnt: RelabeledGuardedIntCounterVec,
62 actor_idle_duration: RelabeledGuardedIntCounterVec,
63 actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65 pub actor_count: LabelGuardedIntGaugeVec,
67 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
70
71 pub source_output_row_count: LabelGuardedIntCounterVec,
73 pub source_split_change_count: LabelGuardedIntCounterVec,
74 pub source_backfill_row_count: LabelGuardedIntCounterVec,
75
76 sink_input_row_count: LabelGuardedIntCounterVec,
78 sink_input_bytes: LabelGuardedIntCounterVec,
79 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
80
81 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
83
84 pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
87
88 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90 actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91
92 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
94 pub join_lookup_total_count: LabelGuardedIntCounterVec,
95 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
96 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
97 pub join_match_duration_ns: LabelGuardedIntCounterVec,
98 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
99 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
100
101 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
103
104 agg_lookup_miss_count: LabelGuardedIntCounterVec,
106 agg_total_lookup_count: LabelGuardedIntCounterVec,
107 agg_cached_entry_count: LabelGuardedIntGaugeVec,
108 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
109 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
110 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
111 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
112 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
113 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
114 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
115 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
116 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
117
118 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
120 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
121 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
122 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
126
127 lookup_cache_miss_count: LabelGuardedIntCounterVec,
129 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
130 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
131
132 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
134 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
135 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
136
137 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
139 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
140
141 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
147
148 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
150 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
151 over_window_cache_miss_count: LabelGuardedIntCounterVec,
152 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
153 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
154 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
155 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
156 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
157 over_window_compute_count: LabelGuardedIntCounterVec,
158 over_window_same_output_count: LabelGuardedIntCounterVec,
159
160 pub barrier_inflight_latency: Histogram,
164 pub barrier_sync_latency: Histogram,
166 pub barrier_batch_size: Histogram,
167 pub barrier_manager_progress: IntCounter,
169
170 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
171 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
172 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
173 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
174 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
177 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
178 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
179 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
180 pub kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
181
182 pub crossdb_last_consumed_min_epoch: LabelGuardedIntGaugeVec,
183
184 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
185 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
189 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
190 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
191 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
192 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
193 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
194 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
195 pub sync_kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
196
197 pub lru_runtime_loop_count: IntCounter,
199 pub lru_latest_sequence: IntGauge,
200 pub lru_watermark_sequence: IntGauge,
201 pub lru_eviction_policy: IntGauge,
202 pub jemalloc_allocated_bytes: IntGauge,
203 pub jemalloc_active_bytes: IntGauge,
204 pub jemalloc_resident_bytes: IntGauge,
205 pub jemalloc_metadata_bytes: IntGauge,
206 pub jvm_allocated_bytes: IntGauge,
207 pub jvm_active_bytes: IntGauge,
208 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214 materialize_input_row_count: RelabeledGuardedIntCounterVec,
215 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216
217 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
219 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
220
221 pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
223 pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
224
225 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
227
228 pub state_table_iter_count: RelabeledGuardedIntCounterVec,
230 pub state_table_get_count: RelabeledGuardedIntCounterVec,
231 pub state_table_iter_vnode_pruned_count: RelabeledGuardedIntCounterVec,
232 pub state_table_get_vnode_pruned_count: RelabeledGuardedIntCounterVec,
233}
234
235pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
236
237pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
238 GLOBAL_STREAMING_METRICS
239 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
240 .clone()
241}
242
243impl StreamingMetrics {
244 pub fn new(registry: &Registry, level: MetricLevel) -> Self {
245 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
246 "stream_executor_row_count",
247 "Total number of rows that have been output from each executor",
248 &["actor_id", "fragment_id", "executor_identity"],
249 registry
250 )
251 .unwrap()
252 .relabel_debug_1(level);
253
254 let stream_node_output_row_count = CountMap::new();
255 let stream_node_output_blocking_duration_ns = CountMap::new();
256
257 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
258 "stream_source_output_rows_counts",
259 "Total number of rows that have been output from source",
260 &["source_id", "source_name", "actor_id", "fragment_id"],
261 registry
262 )
263 .unwrap();
264
265 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
266 "stream_source_split_change_event_count",
267 "Total number of split change events that have been operated by source",
268 &["source_id", "source_name", "actor_id", "fragment_id"],
269 registry
270 )
271 .unwrap();
272
273 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
274 "stream_source_backfill_rows_counts",
275 "Total number of rows that have been backfilled for source",
276 &["source_id", "source_name", "actor_id", "fragment_id"],
277 registry
278 )
279 .unwrap();
280
281 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
282 "stream_sink_input_row_count",
283 "Total number of rows streamed into sink executors",
284 &["sink_id", "actor_id", "fragment_id"],
285 registry
286 )
287 .unwrap();
288
289 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
290 "stream_sink_input_bytes",
291 "Total size of chunks streamed into sink executors",
292 &["sink_id", "actor_id", "fragment_id"],
293 registry
294 )
295 .unwrap();
296
297 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
298 "stream_mview_input_row_count",
299 "Total number of rows streamed into materialize executors",
300 &["actor_id", "table_id", "fragment_id"],
301 registry
302 )
303 .unwrap()
304 .relabel_debug_1(level);
305
306 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
307 "stream_mview_current_epoch",
308 "The current epoch of the materialized executor",
309 &["actor_id", "table_id", "fragment_id"],
310 registry
311 )
312 .unwrap()
313 .relabel_debug_1(level);
314
315 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
316 "stream_pg_cdc_state_table_lsn",
317 "Current LSN value stored in PostgreSQL CDC state table",
318 &["source_id"],
319 registry,
320 )
321 .unwrap();
322
323 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
324 "stream_pg_cdc_jni_commit_offset_lsn",
325 "LSN value when JNI commit offset is called for PostgreSQL CDC",
326 &["source_id"],
327 registry,
328 )
329 .unwrap();
330
331 let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
332 "stream_mysql_cdc_state_binlog_file_seq",
333 "Current binlog file sequence number stored in MySQL CDC state table",
334 &["source_id"],
335 registry,
336 )
337 .unwrap();
338
339 let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
340 "stream_mysql_cdc_state_binlog_position",
341 "Current binlog position stored in MySQL CDC state table",
342 &["source_id"],
343 registry,
344 )
345 .unwrap();
346
347 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
348 "stream_sink_chunk_buffer_size",
349 "Total size of chunks buffered in a barrier",
350 &["sink_id", "actor_id", "fragment_id"],
351 registry
352 )
353 .unwrap();
354 let actor_output_buffer_blocking_duration_ns =
355 register_guarded_int_counter_vec_with_registry!(
356 "stream_actor_output_buffer_blocking_duration_ns",
357 "Total blocking duration (ns) of output buffer",
358 &["actor_id", "fragment_id", "downstream_fragment_id"],
359 registry
360 )
361 .unwrap()
362 .relabel_debug_1(level);
364
365 let actor_input_buffer_blocking_duration_ns =
366 register_guarded_int_counter_vec_with_registry!(
367 "stream_actor_input_buffer_blocking_duration_ns",
368 "Total blocking duration (ns) of input buffer",
369 &["actor_id", "fragment_id", "upstream_fragment_id"],
370 registry
371 )
372 .unwrap()
373 .relabel_debug_1(level);
375
376 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
377 "stream_exchange_frag_recv_size",
378 "Total size of messages that have been received from upstream Fragment",
379 &["up_fragment_id", "down_fragment_id"],
380 registry
381 )
382 .unwrap();
383
384 let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
385 "stream_actor_poll_duration",
386 "tokio's metrics",
387 &["actor_id", "fragment_id"],
388 registry
389 )
390 .unwrap()
391 .relabel_debug_1(level);
392
393 let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
394 "stream_actor_poll_cnt",
395 "tokio's metrics",
396 &["actor_id", "fragment_id"],
397 registry
398 )
399 .unwrap()
400 .relabel_debug_1(level);
401
402 let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
403 "stream_actor_scheduled_duration",
404 "tokio's metrics",
405 &["actor_id", "fragment_id"],
406 registry
407 )
408 .unwrap()
409 .relabel_debug_1(level);
410
411 let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
412 "stream_actor_scheduled_cnt",
413 "tokio's metrics",
414 &["actor_id", "fragment_id"],
415 registry
416 )
417 .unwrap()
418 .relabel_debug_1(level);
419
420 let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
421 "stream_actor_idle_duration",
422 "tokio's metrics",
423 &["actor_id", "fragment_id"],
424 registry
425 )
426 .unwrap()
427 .relabel_debug_1(level);
428
429 let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
430 "stream_actor_idle_cnt",
431 "tokio's metrics",
432 &["actor_id", "fragment_id"],
433 registry
434 )
435 .unwrap()
436 .relabel_debug_1(level);
437
438 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
439 "stream_actor_in_record_cnt",
440 "Total number of rows actor received",
441 &["actor_id", "fragment_id", "upstream_fragment_id"],
442 registry
443 )
444 .unwrap()
445 .relabel_debug_1(level);
446
447 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
448 "stream_actor_out_record_cnt",
449 "Total number of rows actor sent",
450 &["actor_id", "fragment_id"],
451 registry
452 )
453 .unwrap()
454 .relabel_debug_1(level);
455
456 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
457 "stream_actor_current_epoch",
458 "Current epoch of actor",
459 &["actor_id", "fragment_id"],
460 registry
461 )
462 .unwrap()
463 .relabel_debug_1(level);
464
465 let actor_count = register_guarded_int_gauge_vec_with_registry!(
466 "stream_actor_count",
467 "Total number of actors (parallelism)",
468 &["fragment_id"],
469 registry
470 )
471 .unwrap();
472
473 let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
474 "stream_merge_barrier_align_duration_ns",
475 "Total merge barrier alignment duration (ns)",
476 &["actor_id", "fragment_id"],
477 registry
478 )
479 .unwrap()
480 .relabel_debug_1(level);
481
482 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
483 "stream_join_lookup_miss_count",
484 "Join executor lookup miss duration",
485 &["side", "join_table_id", "actor_id", "fragment_id"],
486 registry
487 )
488 .unwrap();
489
490 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
491 "stream_join_lookup_total_count",
492 "Join executor lookup total operation",
493 &["side", "join_table_id", "actor_id", "fragment_id"],
494 registry
495 )
496 .unwrap();
497
498 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
499 "stream_join_insert_cache_miss_count",
500 "Join executor cache miss when insert operation",
501 &["side", "join_table_id", "actor_id", "fragment_id"],
502 registry
503 )
504 .unwrap();
505
506 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
507 "stream_join_actor_input_waiting_duration_ns",
508 "Total waiting duration (ns) of input buffer of join actor",
509 &["actor_id", "fragment_id"],
510 registry
511 )
512 .unwrap();
513
514 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
515 "stream_join_match_duration_ns",
516 "Matching duration for each side",
517 &["actor_id", "fragment_id", "side"],
518 registry
519 )
520 .unwrap();
521
522 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
523 "stream_barrier_align_duration_ns",
524 "Duration of join align barrier",
525 &["actor_id", "fragment_id", "wait_side", "executor"],
526 registry
527 )
528 .unwrap()
529 .relabel_debug_1(level);
530
531 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
532 "stream_join_cached_entry_count",
533 "Number of cached entries in streaming join operators",
534 &["actor_id", "fragment_id", "side"],
535 registry
536 )
537 .unwrap();
538
539 let join_matched_join_keys_opts = histogram_opts!(
540 "stream_join_matched_join_keys",
541 "The number of keys matched in the opposite side",
542 exponential_buckets(16.0, 2.0, 28).unwrap() );
544
545 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
546 join_matched_join_keys_opts,
547 &["actor_id", "fragment_id", "table_id"],
548 registry
549 )
550 .unwrap()
551 .relabel_debug_1(level);
552
553 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
554 "stream_agg_lookup_miss_count",
555 "Aggregation executor lookup miss duration",
556 &["table_id", "actor_id", "fragment_id"],
557 registry
558 )
559 .unwrap();
560
561 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
562 "stream_agg_lookup_total_count",
563 "Aggregation executor lookup total operation",
564 &["table_id", "actor_id", "fragment_id"],
565 registry
566 )
567 .unwrap();
568
569 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
570 "stream_agg_distinct_cache_miss_count",
571 "Aggregation executor dinsinct miss duration",
572 &["table_id", "actor_id", "fragment_id"],
573 registry
574 )
575 .unwrap();
576
577 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
578 "stream_agg_distinct_total_cache_count",
579 "Aggregation executor distinct total operation",
580 &["table_id", "actor_id", "fragment_id"],
581 registry
582 )
583 .unwrap();
584
585 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
586 "stream_agg_distinct_cached_entry_count",
587 "Total entry counts in distinct aggregation executor cache",
588 &["table_id", "actor_id", "fragment_id"],
589 registry
590 )
591 .unwrap();
592
593 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
594 "stream_agg_dirty_groups_count",
595 "Total dirty group counts in aggregation executor",
596 &["table_id", "actor_id", "fragment_id"],
597 registry
598 )
599 .unwrap();
600
601 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
602 "stream_agg_dirty_groups_heap_size",
603 "Total dirty group heap size in aggregation executor",
604 &["table_id", "actor_id", "fragment_id"],
605 registry
606 )
607 .unwrap();
608
609 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
610 "stream_agg_state_cache_lookup_count",
611 "Aggregation executor state cache lookup count",
612 &["table_id", "actor_id", "fragment_id"],
613 registry
614 )
615 .unwrap();
616
617 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
618 "stream_agg_state_cache_miss_count",
619 "Aggregation executor state cache miss count",
620 &["table_id", "actor_id", "fragment_id"],
621 registry
622 )
623 .unwrap();
624
625 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
626 "stream_group_top_n_cache_miss_count",
627 "Group top n executor cache miss count",
628 &["table_id", "actor_id", "fragment_id"],
629 registry
630 )
631 .unwrap();
632
633 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
634 "stream_group_top_n_total_query_cache_count",
635 "Group top n executor query cache total count",
636 &["table_id", "actor_id", "fragment_id"],
637 registry
638 )
639 .unwrap();
640
641 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
642 "stream_group_top_n_cached_entry_count",
643 "Total entry counts in group top n executor cache",
644 &["table_id", "actor_id", "fragment_id"],
645 registry
646 )
647 .unwrap();
648
649 let group_top_n_appendonly_cache_miss_count =
650 register_guarded_int_counter_vec_with_registry!(
651 "stream_group_top_n_appendonly_cache_miss_count",
652 "Group top n appendonly executor cache miss count",
653 &["table_id", "actor_id", "fragment_id"],
654 registry
655 )
656 .unwrap();
657
658 let group_top_n_appendonly_total_query_cache_count =
659 register_guarded_int_counter_vec_with_registry!(
660 "stream_group_top_n_appendonly_total_query_cache_count",
661 "Group top n appendonly executor total cache count",
662 &["table_id", "actor_id", "fragment_id"],
663 registry
664 )
665 .unwrap();
666
667 let group_top_n_appendonly_cached_entry_count =
668 register_guarded_int_gauge_vec_with_registry!(
669 "stream_group_top_n_appendonly_cached_entry_count",
670 "Total entry counts in group top n appendonly executor cache",
671 &["table_id", "actor_id", "fragment_id"],
672 registry
673 )
674 .unwrap();
675
676 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
677 "stream_lookup_cache_miss_count",
678 "Lookup executor cache miss count",
679 &["table_id", "actor_id", "fragment_id"],
680 registry
681 )
682 .unwrap();
683
684 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
685 "stream_lookup_total_query_cache_count",
686 "Lookup executor query cache total count",
687 &["table_id", "actor_id", "fragment_id"],
688 registry
689 )
690 .unwrap();
691
692 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
693 "stream_lookup_cached_entry_count",
694 "Total entry counts in lookup executor cache",
695 &["table_id", "actor_id", "fragment_id"],
696 registry
697 )
698 .unwrap();
699
700 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
701 "stream_temporal_join_cache_miss_count",
702 "Temporal join executor cache miss count",
703 &["table_id", "actor_id", "fragment_id"],
704 registry
705 )
706 .unwrap();
707
708 let temporal_join_total_query_cache_count =
709 register_guarded_int_counter_vec_with_registry!(
710 "stream_temporal_join_total_query_cache_count",
711 "Temporal join executor query cache total count",
712 &["table_id", "actor_id", "fragment_id"],
713 registry
714 )
715 .unwrap();
716
717 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
718 "stream_temporal_join_cached_entry_count",
719 "Total entry count in temporal join executor cache",
720 &["table_id", "actor_id", "fragment_id"],
721 registry
722 )
723 .unwrap();
724
725 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
726 "stream_agg_cached_entry_count",
727 "Number of cached keys in streaming aggregation operators",
728 &["table_id", "actor_id", "fragment_id"],
729 registry
730 )
731 .unwrap();
732
733 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
734 "stream_agg_chunk_lookup_miss_count",
735 "Aggregation executor chunk-level lookup miss duration",
736 &["table_id", "actor_id", "fragment_id"],
737 registry
738 )
739 .unwrap();
740
741 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
742 "stream_agg_chunk_lookup_total_count",
743 "Aggregation executor chunk-level lookup total operation",
744 &["table_id", "actor_id", "fragment_id"],
745 registry
746 )
747 .unwrap();
748
749 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
750 "stream_backfill_snapshot_read_row_count",
751 "Total number of rows that have been read from the backfill snapshot",
752 &["table_id", "actor_id"],
753 registry
754 )
755 .unwrap();
756
757 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
758 "stream_backfill_upstream_output_row_count",
759 "Total number of rows that have been output from the backfill upstream",
760 &["table_id", "actor_id"],
761 registry
762 )
763 .unwrap();
764
765 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
766 "stream_cdc_backfill_snapshot_read_row_count",
767 "Total number of rows that have been read from the cdc_backfill snapshot",
768 &["table_id", "actor_id"],
769 registry
770 )
771 .unwrap();
772
773 let cdc_backfill_upstream_output_row_count =
774 register_guarded_int_counter_vec_with_registry!(
775 "stream_cdc_backfill_upstream_output_row_count",
776 "Total number of rows that have been output from the cdc_backfill upstream",
777 &["table_id", "actor_id"],
778 registry
779 )
780 .unwrap();
781
782 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
783 "stream_snapshot_backfill_consume_snapshot_row_count",
784 "Total number of rows that have been output from snapshot backfill",
785 &["table_id", "actor_id", "stage"],
786 registry
787 )
788 .unwrap();
789
790 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
791 "stream_over_window_cached_entry_count",
792 "Total entry (partition) count in over window executor cache",
793 &["table_id", "actor_id", "fragment_id"],
794 registry
795 )
796 .unwrap();
797
798 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
799 "stream_over_window_cache_lookup_count",
800 "Over window executor cache lookup count",
801 &["table_id", "actor_id", "fragment_id"],
802 registry
803 )
804 .unwrap();
805
806 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
807 "stream_over_window_cache_miss_count",
808 "Over window executor cache miss count",
809 &["table_id", "actor_id", "fragment_id"],
810 registry
811 )
812 .unwrap();
813
814 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
815 "stream_over_window_range_cache_entry_count",
816 "Over window partition range cache entry count",
817 &["table_id", "actor_id", "fragment_id"],
818 registry,
819 )
820 .unwrap();
821
822 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
823 "stream_over_window_range_cache_lookup_count",
824 "Over window partition range cache lookup count",
825 &["table_id", "actor_id", "fragment_id"],
826 registry
827 )
828 .unwrap();
829
830 let over_window_range_cache_left_miss_count =
831 register_guarded_int_counter_vec_with_registry!(
832 "stream_over_window_range_cache_left_miss_count",
833 "Over window partition range cache left miss count",
834 &["table_id", "actor_id", "fragment_id"],
835 registry
836 )
837 .unwrap();
838
839 let over_window_range_cache_right_miss_count =
840 register_guarded_int_counter_vec_with_registry!(
841 "stream_over_window_range_cache_right_miss_count",
842 "Over window partition range cache right miss count",
843 &["table_id", "actor_id", "fragment_id"],
844 registry
845 )
846 .unwrap();
847
848 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
849 "stream_over_window_accessed_entry_count",
850 "Over window accessed entry count",
851 &["table_id", "actor_id", "fragment_id"],
852 registry
853 )
854 .unwrap();
855
856 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
857 "stream_over_window_compute_count",
858 "Over window compute count",
859 &["table_id", "actor_id", "fragment_id"],
860 registry
861 )
862 .unwrap();
863
864 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
865 "stream_over_window_same_output_count",
866 "Over window same output count",
867 &["table_id", "actor_id", "fragment_id"],
868 registry
869 )
870 .unwrap();
871
872 let opts = histogram_opts!(
873 "stream_barrier_inflight_duration_seconds",
874 "barrier_inflight_latency",
875 exponential_buckets(0.1, 1.5, 16).unwrap() );
877 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
878
879 let opts = histogram_opts!(
880 "stream_barrier_sync_storage_duration_seconds",
881 "barrier_sync_latency",
882 exponential_buckets(0.1, 1.5, 16).unwrap() );
884 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
885
886 let opts = histogram_opts!(
887 "stream_barrier_batch_size",
888 "barrier_batch_size",
889 exponential_buckets(1.0, 2.0, 8).unwrap()
890 );
891 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
892
893 let barrier_manager_progress = register_int_counter_with_registry!(
894 "stream_barrier_manager_progress",
895 "The number of actors that have processed the earliest in-flight barriers",
896 registry
897 )
898 .unwrap();
899
900 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
901 "sync_kv_log_store_wait_next_poll_ns",
902 "Total duration (ns) of waiting for next poll",
903 &["actor_id", "target", "fragment_id", "relation"],
904 registry
905 )
906 .unwrap();
907
908 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
909 "sync_kv_log_store_read_count",
910 "read row count throughput of sync_kv log store",
911 &["type", "actor_id", "target", "fragment_id", "relation"],
912 registry
913 )
914 .unwrap();
915
916 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
917 "sync_kv_log_store_read_size",
918 "read size throughput of sync_kv log store",
919 &["type", "actor_id", "target", "fragment_id", "relation"],
920 registry
921 )
922 .unwrap();
923
924 let sync_kv_log_store_write_pause_duration_ns =
925 register_guarded_int_counter_vec_with_registry!(
926 "sync_kv_log_store_write_pause_duration_ns",
927 "Duration (ns) of sync_kv log store write pause",
928 &["actor_id", "target", "fragment_id", "relation"],
929 registry
930 )
931 .unwrap();
932
933 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
934 "sync_kv_log_store_state",
935 "clean/unclean state transition for sync_kv log store",
936 &["state", "actor_id", "target", "fragment_id", "relation"],
937 registry
938 )
939 .unwrap();
940
941 let sync_kv_log_store_storage_write_count =
942 register_guarded_int_counter_vec_with_registry!(
943 "sync_kv_log_store_storage_write_count",
944 "Write row count throughput of sync_kv log store",
945 &["actor_id", "target", "fragment_id", "relation"],
946 registry
947 )
948 .unwrap();
949
950 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
951 "sync_kv_log_store_storage_write_size",
952 "Write size throughput of sync_kv log store",
953 &["actor_id", "target", "fragment_id", "relation"],
954 registry
955 )
956 .unwrap();
957
958 let sync_kv_log_store_buffer_unconsumed_item_count =
959 register_guarded_int_gauge_vec_with_registry!(
960 "sync_kv_log_store_buffer_unconsumed_item_count",
961 "Number of Unconsumed Item in buffer",
962 &["actor_id", "target", "fragment_id", "relation"],
963 registry
964 )
965 .unwrap();
966
967 let sync_kv_log_store_buffer_unconsumed_row_count =
968 register_guarded_int_gauge_vec_with_registry!(
969 "sync_kv_log_store_buffer_unconsumed_row_count",
970 "Number of Unconsumed Row in buffer",
971 &["actor_id", "target", "fragment_id", "relation"],
972 registry
973 )
974 .unwrap();
975
976 let sync_kv_log_store_buffer_unconsumed_epoch_count =
977 register_guarded_int_gauge_vec_with_registry!(
978 "sync_kv_log_store_buffer_unconsumed_epoch_count",
979 "Number of Unconsumed Epoch in buffer",
980 &["actor_id", "target", "fragment_id", "relation"],
981 registry
982 )
983 .unwrap();
984
985 let sync_kv_log_store_buffer_unconsumed_min_epoch =
986 register_guarded_int_gauge_vec_with_registry!(
987 "sync_kv_log_store_buffer_unconsumed_min_epoch",
988 "Number of Unconsumed Epoch in buffer",
989 &["actor_id", "target", "fragment_id", "relation"],
990 registry
991 )
992 .unwrap();
993 let sync_kv_log_store_buffer_memory_bytes =
994 register_guarded_int_gauge_vec_with_registry!(
995 "sync_kv_log_store_buffer_memory_bytes",
996 "Estimated heap bytes used by synced kv log store buffer (unconsumed + consumed but not truncated)",
997 &["actor_id", "target", "fragment_id", "relation"],
998 registry
999 )
1000 .unwrap();
1001
1002 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1003 "kv_log_store_storage_write_count",
1004 "Write row count throughput of kv log store",
1005 &["actor_id", "connector", "sink_id", "sink_name"],
1006 registry
1007 )
1008 .unwrap();
1009
1010 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1011 "kv_log_store_storage_write_size",
1012 "Write size throughput of kv log store",
1013 &["actor_id", "connector", "sink_id", "sink_name"],
1014 registry
1015 )
1016 .unwrap();
1017
1018 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1019 "kv_log_store_storage_read_count",
1020 "Write row count throughput of kv log store",
1021 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1022 registry
1023 )
1024 .unwrap();
1025
1026 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1027 "kv_log_store_storage_read_size",
1028 "Write size throughput of kv log store",
1029 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1030 registry
1031 )
1032 .unwrap();
1033
1034 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1035 "kv_log_store_rewind_count",
1036 "Kv log store rewind rate",
1037 &["actor_id", "connector", "sink_id", "sink_name"],
1038 registry
1039 )
1040 .unwrap();
1041
1042 let kv_log_store_rewind_delay_opts = {
1043 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1044 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1045 - REWIND_BASE_DELAY.as_secs_f64().log2())
1046 .ceil() as usize;
1047 let buckets = exponential_buckets(
1048 REWIND_BASE_DELAY.as_secs_f64(),
1049 REWIND_BACKOFF_FACTOR as _,
1050 bucket_count,
1051 )
1052 .unwrap();
1053 histogram_opts!(
1054 "kv_log_store_rewind_delay",
1055 "Kv log store rewind delay",
1056 buckets,
1057 )
1058 };
1059
1060 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1061 kv_log_store_rewind_delay_opts,
1062 &["actor_id", "connector", "sink_id", "sink_name"],
1063 registry
1064 )
1065 .unwrap();
1066
1067 let kv_log_store_buffer_unconsumed_item_count =
1068 register_guarded_int_gauge_vec_with_registry!(
1069 "kv_log_store_buffer_unconsumed_item_count",
1070 "Number of Unconsumed Item in buffer",
1071 &["actor_id", "connector", "sink_id", "sink_name"],
1072 registry
1073 )
1074 .unwrap();
1075
1076 let kv_log_store_buffer_unconsumed_row_count =
1077 register_guarded_int_gauge_vec_with_registry!(
1078 "kv_log_store_buffer_unconsumed_row_count",
1079 "Number of Unconsumed Row in buffer",
1080 &["actor_id", "connector", "sink_id", "sink_name"],
1081 registry
1082 )
1083 .unwrap();
1084
1085 let kv_log_store_buffer_unconsumed_epoch_count =
1086 register_guarded_int_gauge_vec_with_registry!(
1087 "kv_log_store_buffer_unconsumed_epoch_count",
1088 "Number of Unconsumed Epoch in buffer",
1089 &["actor_id", "connector", "sink_id", "sink_name"],
1090 registry
1091 )
1092 .unwrap();
1093
1094 let kv_log_store_buffer_unconsumed_min_epoch =
1095 register_guarded_int_gauge_vec_with_registry!(
1096 "kv_log_store_buffer_unconsumed_min_epoch",
1097 "Number of Unconsumed Epoch in buffer",
1098 &["actor_id", "connector", "sink_id", "sink_name"],
1099 registry
1100 )
1101 .unwrap();
1102
1103 let crossdb_last_consumed_min_epoch = register_guarded_int_gauge_vec_with_registry!(
1104 "crossdb_last_consumed_min_epoch",
1105 "Last consumed min epoch for cross-database changelog stream scan",
1106 &["table_id", "actor_id", "fragment_id"],
1107 registry
1108 )
1109 .unwrap();
1110
1111 let kv_log_store_buffer_memory_bytes =
1112 register_guarded_int_gauge_vec_with_registry!(
1113 "kv_log_store_buffer_memory_bytes",
1114 "Estimated heap bytes used by kv log store buffer (unconsumed + consumed but not truncated)",
1115 &["actor_id", "connector", "sink_id", "sink_name"],
1116 registry
1117 )
1118 .unwrap();
1119
1120 let lru_runtime_loop_count = register_int_counter_with_registry!(
1121 "lru_runtime_loop_count",
1122 "The counts of the eviction loop in LRU manager per second",
1123 registry
1124 )
1125 .unwrap();
1126
1127 let lru_latest_sequence = register_int_gauge_with_registry!(
1128 "lru_latest_sequence",
1129 "Current LRU global sequence",
1130 registry,
1131 )
1132 .unwrap();
1133
1134 let lru_watermark_sequence = register_int_gauge_with_registry!(
1135 "lru_watermark_sequence",
1136 "Current LRU watermark sequence",
1137 registry,
1138 )
1139 .unwrap();
1140
1141 let lru_eviction_policy = register_int_gauge_with_registry!(
1142 "lru_eviction_policy",
1143 "Current LRU eviction policy",
1144 registry,
1145 )
1146 .unwrap();
1147
1148 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1149 "jemalloc_allocated_bytes",
1150 "The allocated memory jemalloc, got from jemalloc_ctl",
1151 registry
1152 )
1153 .unwrap();
1154
1155 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1156 "jemalloc_active_bytes",
1157 "The active memory jemalloc, got from jemalloc_ctl",
1158 registry
1159 )
1160 .unwrap();
1161
1162 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1163 "jemalloc_resident_bytes",
1164 "The active memory jemalloc, got from jemalloc_ctl",
1165 registry
1166 )
1167 .unwrap();
1168
1169 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1170 "jemalloc_metadata_bytes",
1171 "The active memory jemalloc, got from jemalloc_ctl",
1172 registry
1173 )
1174 .unwrap();
1175
1176 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1177 "jvm_allocated_bytes",
1178 "The allocated jvm memory",
1179 registry
1180 )
1181 .unwrap();
1182
1183 let jvm_active_bytes = register_int_gauge_with_registry!(
1184 "jvm_active_bytes",
1185 "The active jvm memory",
1186 registry
1187 )
1188 .unwrap();
1189
1190 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1191 "stream_materialize_cache_hit_count",
1192 "Materialize executor cache hit count",
1193 &["actor_id", "table_id", "fragment_id"],
1194 registry
1195 )
1196 .unwrap()
1197 .relabel_debug_1(level);
1198
1199 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1200 "stream_materialize_data_exist_count",
1201 "Materialize executor data exist count",
1202 &["actor_id", "table_id", "fragment_id"],
1203 registry
1204 )
1205 .unwrap()
1206 .relabel_debug_1(level);
1207
1208 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1209 "stream_materialize_cache_total_count",
1210 "Materialize executor cache total operation",
1211 &["actor_id", "table_id", "fragment_id"],
1212 registry
1213 )
1214 .unwrap()
1215 .relabel_debug_1(level);
1216
1217 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1218 "stream_memory_usage",
1219 "Memory usage for stream executors",
1220 &["actor_id", "table_id", "desc"],
1221 registry
1222 )
1223 .unwrap()
1224 .relabel_debug_1(level);
1225
1226 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1227 "gap_fill_generated_rows_count",
1228 "Total number of rows generated by gap fill executor",
1229 &["actor_id", "fragment_id"],
1230 registry
1231 )
1232 .unwrap()
1233 .relabel_debug_1(level);
1234
1235 let state_table_iter_count = register_guarded_int_counter_vec_with_registry!(
1236 "state_table_iter_count",
1237 "Total number of state table iter operations",
1238 &["actor_id", "fragment_id", "table_id"],
1239 registry
1240 )
1241 .unwrap()
1242 .relabel_debug_1(level);
1243
1244 let state_table_get_count = register_guarded_int_counter_vec_with_registry!(
1245 "state_table_get_count",
1246 "Total number of state table get operations",
1247 &["actor_id", "fragment_id", "table_id"],
1248 registry
1249 )
1250 .unwrap()
1251 .relabel_debug_1(level);
1252
1253 let state_table_iter_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1254 "state_table_iter_vnode_pruned_count",
1255 "Total number of state table iter operations pruned by vnode statistics",
1256 &["actor_id", "fragment_id", "table_id"],
1257 registry
1258 )
1259 .unwrap()
1260 .relabel_debug_1(level);
1261
1262 let state_table_get_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1263 "state_table_get_vnode_pruned_count",
1264 "Total number of state table get operations pruned by vnode statistics",
1265 &["actor_id", "fragment_id", "table_id"],
1266 registry
1267 )
1268 .unwrap()
1269 .relabel_debug_1(level);
1270
1271 Self {
1272 level,
1273 executor_row_count,
1274 mem_stream_node_output_row_count: stream_node_output_row_count,
1275 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1276 actor_scheduled_duration,
1277 actor_scheduled_cnt,
1278 actor_poll_duration,
1279 actor_poll_cnt,
1280 actor_idle_duration,
1281 actor_idle_cnt,
1282 actor_count,
1283 actor_in_record_cnt,
1284 actor_out_record_cnt,
1285 actor_current_epoch,
1286 source_output_row_count,
1287 source_split_change_count,
1288 source_backfill_row_count,
1289 sink_input_row_count,
1290 sink_input_bytes,
1291 sink_chunk_buffer_size,
1292 exchange_frag_recv_size,
1293 merge_barrier_align_duration,
1294 actor_output_buffer_blocking_duration_ns,
1295 actor_input_buffer_blocking_duration_ns,
1296 join_lookup_miss_count,
1297 join_lookup_total_count,
1298 join_insert_cache_miss_count,
1299 join_actor_input_waiting_duration_ns,
1300 join_match_duration_ns,
1301 join_cached_entry_count,
1302 join_matched_join_keys,
1303 barrier_align_duration,
1304 agg_lookup_miss_count,
1305 agg_total_lookup_count,
1306 agg_cached_entry_count,
1307 agg_chunk_lookup_miss_count,
1308 agg_chunk_total_lookup_count,
1309 agg_dirty_groups_count,
1310 agg_dirty_groups_heap_size,
1311 agg_distinct_cache_miss_count,
1312 agg_distinct_total_cache_count,
1313 agg_distinct_cached_entry_count,
1314 agg_state_cache_lookup_count,
1315 agg_state_cache_miss_count,
1316 group_top_n_cache_miss_count,
1317 group_top_n_total_query_cache_count,
1318 group_top_n_cached_entry_count,
1319 group_top_n_appendonly_cache_miss_count,
1320 group_top_n_appendonly_total_query_cache_count,
1321 group_top_n_appendonly_cached_entry_count,
1322 lookup_cache_miss_count,
1323 lookup_total_query_cache_count,
1324 lookup_cached_entry_count,
1325 temporal_join_cache_miss_count,
1326 temporal_join_total_query_cache_count,
1327 temporal_join_cached_entry_count,
1328 backfill_snapshot_read_row_count,
1329 backfill_upstream_output_row_count,
1330 cdc_backfill_snapshot_read_row_count,
1331 cdc_backfill_upstream_output_row_count,
1332 snapshot_backfill_consume_row_count,
1333 over_window_cached_entry_count,
1334 over_window_cache_lookup_count,
1335 over_window_cache_miss_count,
1336 over_window_range_cache_entry_count,
1337 over_window_range_cache_lookup_count,
1338 over_window_range_cache_left_miss_count,
1339 over_window_range_cache_right_miss_count,
1340 over_window_accessed_entry_count,
1341 over_window_compute_count,
1342 over_window_same_output_count,
1343 barrier_inflight_latency,
1344 barrier_sync_latency,
1345 barrier_batch_size,
1346 barrier_manager_progress,
1347 kv_log_store_storage_write_count,
1348 kv_log_store_storage_write_size,
1349 kv_log_store_rewind_count,
1350 kv_log_store_rewind_delay,
1351 kv_log_store_storage_read_count,
1352 kv_log_store_storage_read_size,
1353 kv_log_store_buffer_unconsumed_item_count,
1354 kv_log_store_buffer_unconsumed_row_count,
1355 kv_log_store_buffer_unconsumed_epoch_count,
1356 kv_log_store_buffer_unconsumed_min_epoch,
1357 kv_log_store_buffer_memory_bytes,
1358 crossdb_last_consumed_min_epoch,
1359 sync_kv_log_store_read_count,
1360 sync_kv_log_store_read_size,
1361 sync_kv_log_store_write_pause_duration_ns,
1362 sync_kv_log_store_state,
1363 sync_kv_log_store_wait_next_poll_ns,
1364 sync_kv_log_store_storage_write_count,
1365 sync_kv_log_store_storage_write_size,
1366 sync_kv_log_store_buffer_unconsumed_item_count,
1367 sync_kv_log_store_buffer_unconsumed_row_count,
1368 sync_kv_log_store_buffer_unconsumed_epoch_count,
1369 sync_kv_log_store_buffer_unconsumed_min_epoch,
1370 sync_kv_log_store_buffer_memory_bytes,
1371 lru_runtime_loop_count,
1372 lru_latest_sequence,
1373 lru_watermark_sequence,
1374 lru_eviction_policy,
1375 jemalloc_allocated_bytes,
1376 jemalloc_active_bytes,
1377 jemalloc_resident_bytes,
1378 jemalloc_metadata_bytes,
1379 jvm_allocated_bytes,
1380 jvm_active_bytes,
1381 stream_memory_usage,
1382 materialize_cache_hit_count,
1383 materialize_data_exist_count,
1384 materialize_cache_total_count,
1385 materialize_input_row_count,
1386 materialize_current_epoch,
1387 pg_cdc_state_table_lsn,
1388 pg_cdc_jni_commit_offset_lsn,
1389 mysql_cdc_state_binlog_file_seq,
1390 mysql_cdc_state_binlog_position,
1391 gap_fill_generated_rows_count,
1392 state_table_iter_count,
1393 state_table_get_count,
1394 state_table_iter_vnode_pruned_count,
1395 state_table_get_vnode_pruned_count,
1396 }
1397 }
1398
1399 pub fn unused() -> Self {
1401 global_streaming_metrics(MetricLevel::Disabled)
1402 }
1403
1404 pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1405 let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1406 let actor_scheduled_duration = self
1407 .actor_scheduled_duration
1408 .with_guarded_label_values(label_list);
1409 let actor_scheduled_cnt = self
1410 .actor_scheduled_cnt
1411 .with_guarded_label_values(label_list);
1412 let actor_poll_duration = self
1413 .actor_poll_duration
1414 .with_guarded_label_values(label_list);
1415 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1416 let actor_idle_duration = self
1417 .actor_idle_duration
1418 .with_guarded_label_values(label_list);
1419 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1420 ActorMetrics {
1421 actor_scheduled_duration,
1422 actor_scheduled_cnt,
1423 actor_poll_duration,
1424 actor_poll_cnt,
1425 actor_idle_duration,
1426 actor_idle_cnt,
1427 }
1428 }
1429
1430 pub(crate) fn new_actor_input_metrics(
1431 &self,
1432 actor_id: ActorId,
1433 fragment_id: FragmentId,
1434 upstream_fragment_id: FragmentId,
1435 ) -> ActorInputMetrics {
1436 let actor_id_str = actor_id.to_string();
1437 let fragment_id_str = fragment_id.to_string();
1438 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1439 ActorInputMetrics {
1440 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1441 &actor_id_str,
1442 &fragment_id_str,
1443 &upstream_fragment_id_str,
1444 ]),
1445 actor_input_buffer_blocking_duration_ns: self
1446 .actor_input_buffer_blocking_duration_ns
1447 .with_guarded_label_values(&[
1448 &actor_id_str,
1449 &fragment_id_str,
1450 &upstream_fragment_id_str,
1451 ]),
1452 }
1453 }
1454
1455 pub fn new_sink_exec_metrics(
1456 &self,
1457 id: SinkId,
1458 actor_id: ActorId,
1459 fragment_id: FragmentId,
1460 ) -> SinkExecutorMetrics {
1461 let label_list: &[&str; 3] = &[
1462 &id.to_string(),
1463 &actor_id.to_string(),
1464 &fragment_id.to_string(),
1465 ];
1466 SinkExecutorMetrics {
1467 sink_input_row_count: self
1468 .sink_input_row_count
1469 .with_guarded_label_values(label_list),
1470 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1471 sink_chunk_buffer_size: self
1472 .sink_chunk_buffer_size
1473 .with_guarded_label_values(label_list),
1474 }
1475 }
1476
1477 pub fn new_group_top_n_metrics(
1478 &self,
1479 table_id: TableId,
1480 actor_id: ActorId,
1481 fragment_id: FragmentId,
1482 ) -> GroupTopNMetrics {
1483 let label_list: &[&str; 3] = &[
1484 &table_id.to_string(),
1485 &actor_id.to_string(),
1486 &fragment_id.to_string(),
1487 ];
1488
1489 GroupTopNMetrics {
1490 group_top_n_cache_miss_count: self
1491 .group_top_n_cache_miss_count
1492 .with_guarded_label_values(label_list),
1493 group_top_n_total_query_cache_count: self
1494 .group_top_n_total_query_cache_count
1495 .with_guarded_label_values(label_list),
1496 group_top_n_cached_entry_count: self
1497 .group_top_n_cached_entry_count
1498 .with_guarded_label_values(label_list),
1499 }
1500 }
1501
1502 pub fn new_append_only_group_top_n_metrics(
1503 &self,
1504 table_id: TableId,
1505 actor_id: ActorId,
1506 fragment_id: FragmentId,
1507 ) -> GroupTopNMetrics {
1508 let label_list: &[&str; 3] = &[
1509 &table_id.to_string(),
1510 &actor_id.to_string(),
1511 &fragment_id.to_string(),
1512 ];
1513
1514 GroupTopNMetrics {
1515 group_top_n_cache_miss_count: self
1516 .group_top_n_appendonly_cache_miss_count
1517 .with_guarded_label_values(label_list),
1518 group_top_n_total_query_cache_count: self
1519 .group_top_n_appendonly_total_query_cache_count
1520 .with_guarded_label_values(label_list),
1521 group_top_n_cached_entry_count: self
1522 .group_top_n_appendonly_cached_entry_count
1523 .with_guarded_label_values(label_list),
1524 }
1525 }
1526
1527 pub fn new_lookup_executor_metrics(
1528 &self,
1529 table_id: TableId,
1530 actor_id: ActorId,
1531 fragment_id: FragmentId,
1532 ) -> LookupExecutorMetrics {
1533 let label_list: &[&str; 3] = &[
1534 &table_id.to_string(),
1535 &actor_id.to_string(),
1536 &fragment_id.to_string(),
1537 ];
1538
1539 LookupExecutorMetrics {
1540 lookup_cache_miss_count: self
1541 .lookup_cache_miss_count
1542 .with_guarded_label_values(label_list),
1543 lookup_total_query_cache_count: self
1544 .lookup_total_query_cache_count
1545 .with_guarded_label_values(label_list),
1546 lookup_cached_entry_count: self
1547 .lookup_cached_entry_count
1548 .with_guarded_label_values(label_list),
1549 }
1550 }
1551
1552 pub fn new_hash_agg_metrics(
1553 &self,
1554 table_id: TableId,
1555 actor_id: ActorId,
1556 fragment_id: FragmentId,
1557 ) -> HashAggMetrics {
1558 let label_list: &[&str; 3] = &[
1559 &table_id.to_string(),
1560 &actor_id.to_string(),
1561 &fragment_id.to_string(),
1562 ];
1563 HashAggMetrics {
1564 agg_lookup_miss_count: self
1565 .agg_lookup_miss_count
1566 .with_guarded_label_values(label_list),
1567 agg_total_lookup_count: self
1568 .agg_total_lookup_count
1569 .with_guarded_label_values(label_list),
1570 agg_cached_entry_count: self
1571 .agg_cached_entry_count
1572 .with_guarded_label_values(label_list),
1573 agg_chunk_lookup_miss_count: self
1574 .agg_chunk_lookup_miss_count
1575 .with_guarded_label_values(label_list),
1576 agg_chunk_total_lookup_count: self
1577 .agg_chunk_total_lookup_count
1578 .with_guarded_label_values(label_list),
1579 agg_dirty_groups_count: self
1580 .agg_dirty_groups_count
1581 .with_guarded_label_values(label_list),
1582 agg_dirty_groups_heap_size: self
1583 .agg_dirty_groups_heap_size
1584 .with_guarded_label_values(label_list),
1585 agg_state_cache_lookup_count: self
1586 .agg_state_cache_lookup_count
1587 .with_guarded_label_values(label_list),
1588 agg_state_cache_miss_count: self
1589 .agg_state_cache_miss_count
1590 .with_guarded_label_values(label_list),
1591 }
1592 }
1593
1594 pub fn new_agg_distinct_dedup_metrics(
1595 &self,
1596 table_id: TableId,
1597 actor_id: ActorId,
1598 fragment_id: FragmentId,
1599 ) -> AggDistinctDedupMetrics {
1600 let label_list: &[&str; 3] = &[
1601 &table_id.to_string(),
1602 &actor_id.to_string(),
1603 &fragment_id.to_string(),
1604 ];
1605 AggDistinctDedupMetrics {
1606 agg_distinct_cache_miss_count: self
1607 .agg_distinct_cache_miss_count
1608 .with_guarded_label_values(label_list),
1609 agg_distinct_total_cache_count: self
1610 .agg_distinct_total_cache_count
1611 .with_guarded_label_values(label_list),
1612 agg_distinct_cached_entry_count: self
1613 .agg_distinct_cached_entry_count
1614 .with_guarded_label_values(label_list),
1615 }
1616 }
1617
1618 pub fn new_temporal_join_metrics(
1619 &self,
1620 table_id: TableId,
1621 actor_id: ActorId,
1622 fragment_id: FragmentId,
1623 ) -> TemporalJoinMetrics {
1624 let label_list: &[&str; 3] = &[
1625 &table_id.to_string(),
1626 &actor_id.to_string(),
1627 &fragment_id.to_string(),
1628 ];
1629 TemporalJoinMetrics {
1630 temporal_join_cache_miss_count: self
1631 .temporal_join_cache_miss_count
1632 .with_guarded_label_values(label_list),
1633 temporal_join_total_query_cache_count: self
1634 .temporal_join_total_query_cache_count
1635 .with_guarded_label_values(label_list),
1636 temporal_join_cached_entry_count: self
1637 .temporal_join_cached_entry_count
1638 .with_guarded_label_values(label_list),
1639 }
1640 }
1641
1642 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1643 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1644 BackfillMetrics {
1645 backfill_snapshot_read_row_count: self
1646 .backfill_snapshot_read_row_count
1647 .with_guarded_label_values(label_list),
1648 backfill_upstream_output_row_count: self
1649 .backfill_upstream_output_row_count
1650 .with_guarded_label_values(label_list),
1651 }
1652 }
1653
1654 pub fn new_cdc_backfill_metrics(
1655 &self,
1656 table_id: TableId,
1657 actor_id: ActorId,
1658 ) -> CdcBackfillMetrics {
1659 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1660 CdcBackfillMetrics {
1661 cdc_backfill_snapshot_read_row_count: self
1662 .cdc_backfill_snapshot_read_row_count
1663 .with_guarded_label_values(label_list),
1664 cdc_backfill_upstream_output_row_count: self
1665 .cdc_backfill_upstream_output_row_count
1666 .with_guarded_label_values(label_list),
1667 }
1668 }
1669
1670 pub fn new_over_window_metrics(
1671 &self,
1672 table_id: TableId,
1673 actor_id: ActorId,
1674 fragment_id: FragmentId,
1675 ) -> OverWindowMetrics {
1676 let label_list: &[&str; 3] = &[
1677 &table_id.to_string(),
1678 &actor_id.to_string(),
1679 &fragment_id.to_string(),
1680 ];
1681 OverWindowMetrics {
1682 over_window_cached_entry_count: self
1683 .over_window_cached_entry_count
1684 .with_guarded_label_values(label_list),
1685 over_window_cache_lookup_count: self
1686 .over_window_cache_lookup_count
1687 .with_guarded_label_values(label_list),
1688 over_window_cache_miss_count: self
1689 .over_window_cache_miss_count
1690 .with_guarded_label_values(label_list),
1691 over_window_range_cache_entry_count: self
1692 .over_window_range_cache_entry_count
1693 .with_guarded_label_values(label_list),
1694 over_window_range_cache_lookup_count: self
1695 .over_window_range_cache_lookup_count
1696 .with_guarded_label_values(label_list),
1697 over_window_range_cache_left_miss_count: self
1698 .over_window_range_cache_left_miss_count
1699 .with_guarded_label_values(label_list),
1700 over_window_range_cache_right_miss_count: self
1701 .over_window_range_cache_right_miss_count
1702 .with_guarded_label_values(label_list),
1703 over_window_accessed_entry_count: self
1704 .over_window_accessed_entry_count
1705 .with_guarded_label_values(label_list),
1706 over_window_compute_count: self
1707 .over_window_compute_count
1708 .with_guarded_label_values(label_list),
1709 over_window_same_output_count: self
1710 .over_window_same_output_count
1711 .with_guarded_label_values(label_list),
1712 }
1713 }
1714
1715 pub fn new_materialize_cache_metrics(
1716 &self,
1717 table_id: TableId,
1718 actor_id: ActorId,
1719 fragment_id: FragmentId,
1720 ) -> MaterializeCacheMetrics {
1721 let label_list: &[&str; 3] = &[
1722 &actor_id.to_string(),
1723 &table_id.to_string(),
1724 &fragment_id.to_string(),
1725 ];
1726 MaterializeCacheMetrics {
1727 materialize_cache_hit_count: self
1728 .materialize_cache_hit_count
1729 .with_guarded_label_values(label_list),
1730 materialize_data_exist_count: self
1731 .materialize_data_exist_count
1732 .with_guarded_label_values(label_list),
1733 materialize_cache_total_count: self
1734 .materialize_cache_total_count
1735 .with_guarded_label_values(label_list),
1736 }
1737 }
1738
1739 pub fn new_materialize_metrics(
1740 &self,
1741 table_id: TableId,
1742 actor_id: ActorId,
1743 fragment_id: FragmentId,
1744 ) -> MaterializeMetrics {
1745 let label_list: &[&str; 3] = &[
1746 &actor_id.to_string(),
1747 &table_id.to_string(),
1748 &fragment_id.to_string(),
1749 ];
1750 MaterializeMetrics {
1751 materialize_input_row_count: self
1752 .materialize_input_row_count
1753 .with_guarded_label_values(label_list),
1754 materialize_current_epoch: self
1755 .materialize_current_epoch
1756 .with_guarded_label_values(label_list),
1757 }
1758 }
1759
1760 pub fn new_state_table_metrics(
1761 &self,
1762 table_id: TableId,
1763 actor_id: ActorId,
1764 fragment_id: FragmentId,
1765 ) -> StateTableMetrics {
1766 let label_list: &[&str; 3] = &[
1767 &actor_id.to_string(),
1768 &fragment_id.to_string(),
1769 &table_id.to_string(),
1770 ];
1771 StateTableMetrics {
1772 iter_count: self
1773 .state_table_iter_count
1774 .with_guarded_label_values(label_list),
1775 get_count: self
1776 .state_table_get_count
1777 .with_guarded_label_values(label_list),
1778 iter_vnode_pruned_count: self
1779 .state_table_iter_vnode_pruned_count
1780 .with_guarded_label_values(label_list),
1781 get_vnode_pruned_count: self
1782 .state_table_get_vnode_pruned_count
1783 .with_guarded_label_values(label_list),
1784 }
1785 }
1786}
1787
1788pub(crate) struct ActorInputMetrics {
1789 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1790 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1791}
1792
1793pub struct ActorMetrics {
1795 pub actor_scheduled_duration: LabelGuardedIntCounter,
1796 pub actor_scheduled_cnt: LabelGuardedIntCounter,
1797 pub actor_poll_duration: LabelGuardedIntCounter,
1798 pub actor_poll_cnt: LabelGuardedIntCounter,
1799 pub actor_idle_duration: LabelGuardedIntCounter,
1800 pub actor_idle_cnt: LabelGuardedIntCounter,
1801}
1802
1803pub struct SinkExecutorMetrics {
1804 pub sink_input_row_count: LabelGuardedIntCounter,
1805 pub sink_input_bytes: LabelGuardedIntCounter,
1806 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1807}
1808
1809pub struct MaterializeCacheMetrics {
1810 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1811 pub materialize_data_exist_count: LabelGuardedIntCounter,
1812 pub materialize_cache_total_count: LabelGuardedIntCounter,
1813}
1814
1815pub struct MaterializeMetrics {
1816 pub materialize_input_row_count: LabelGuardedIntCounter,
1817 pub materialize_current_epoch: LabelGuardedIntGauge,
1818}
1819
1820pub struct GroupTopNMetrics {
1821 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1822 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1823 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1824}
1825
1826pub struct LookupExecutorMetrics {
1827 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1828 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1829 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1830}
1831
1832pub struct HashAggMetrics {
1833 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1834 pub agg_total_lookup_count: LabelGuardedIntCounter,
1835 pub agg_cached_entry_count: LabelGuardedIntGauge,
1836 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1837 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1838 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1839 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1840 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1841 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1842}
1843
1844pub struct AggDistinctDedupMetrics {
1845 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1846 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1847 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1848}
1849
1850pub struct TemporalJoinMetrics {
1851 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1852 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1853 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1854}
1855
1856pub struct BackfillMetrics {
1857 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1858 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1859}
1860
1861#[derive(Clone)]
1862pub struct CdcBackfillMetrics {
1863 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1864 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1865}
1866
1867pub struct OverWindowMetrics {
1868 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1869 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1870 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1871 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1872 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1873 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1874 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1875 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1876 pub over_window_compute_count: LabelGuardedIntCounter,
1877 pub over_window_same_output_count: LabelGuardedIntCounter,
1878}
1879
1880pub struct StateTableMetrics {
1881 pub iter_count: LabelGuardedIntCounter,
1882 pub get_count: LabelGuardedIntCounter,
1883 pub iter_vnode_pruned_count: LabelGuardedIntCounter,
1884 pub get_vnode_pruned_count: LabelGuardedIntCounter,
1885}