1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26 LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45 pub level: MetricLevel,
46
47 pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50 pub mem_stream_node_output_row_count: CountMap,
54 pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56 actor_scheduled_duration: RelabeledGuardedIntCounterVec,
58 actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
59 actor_poll_duration: RelabeledGuardedIntCounterVec,
60 actor_poll_cnt: RelabeledGuardedIntCounterVec,
61 actor_idle_duration: RelabeledGuardedIntCounterVec,
62 actor_idle_cnt: RelabeledGuardedIntCounterVec,
63
64 pub actor_count: LabelGuardedIntGaugeVec,
66 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
67 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
68 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
69
70 pub source_output_row_count: LabelGuardedIntCounterVec,
72 pub source_split_change_count: LabelGuardedIntCounterVec,
73 pub source_backfill_row_count: LabelGuardedIntCounterVec,
74
75 sink_input_row_count: LabelGuardedIntCounterVec,
77 sink_input_bytes: LabelGuardedIntCounterVec,
78 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
79
80 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
82
83 pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
86
87 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
89 actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
90
91 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
93 pub join_lookup_total_count: LabelGuardedIntCounterVec,
94 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
95 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
96 pub join_match_duration_ns: LabelGuardedIntCounterVec,
97 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
98 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
99
100 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
102
103 agg_lookup_miss_count: LabelGuardedIntCounterVec,
105 agg_total_lookup_count: LabelGuardedIntCounterVec,
106 agg_cached_entry_count: LabelGuardedIntGaugeVec,
107 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
108 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
109 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
110 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
111 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
112 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
113 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
114 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
115 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
116
117 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
119 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
120 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
121 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
123 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
124 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
125
126 lookup_cache_miss_count: LabelGuardedIntCounterVec,
128 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
129 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
130
131 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
133 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
134 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
135
136 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
138 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
139
140 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
142 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
143
144 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
146
147 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
149 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
150 over_window_cache_miss_count: LabelGuardedIntCounterVec,
151 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
152 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
153 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
154 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
155 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
156 over_window_compute_count: LabelGuardedIntCounterVec,
157 over_window_same_output_count: LabelGuardedIntCounterVec,
158
159 pub barrier_inflight_latency: Histogram,
163 pub barrier_sync_latency: Histogram,
165 pub barrier_batch_size: Histogram,
166 pub barrier_manager_progress: IntCounter,
168
169 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
170 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
171 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
172 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
173 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
174 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
175 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
176 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
177 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
178 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
179
180 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
181 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
182 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
183 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
184 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
185 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
188 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
189 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
190 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
191
192 pub lru_runtime_loop_count: IntCounter,
194 pub lru_latest_sequence: IntGauge,
195 pub lru_watermark_sequence: IntGauge,
196 pub lru_eviction_policy: IntGauge,
197 pub jemalloc_allocated_bytes: IntGauge,
198 pub jemalloc_active_bytes: IntGauge,
199 pub jemalloc_resident_bytes: IntGauge,
200 pub jemalloc_metadata_bytes: IntGauge,
201 pub jvm_allocated_bytes: IntGauge,
202 pub jvm_active_bytes: IntGauge,
203 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
204
205 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
207 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
208 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
209 materialize_input_row_count: RelabeledGuardedIntCounterVec,
210 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
211
212 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
214 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
215
216 pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
218 pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
219
220 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
222}
223
224pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
225
226pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
227 GLOBAL_STREAMING_METRICS
228 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
229 .clone()
230}
231
232impl StreamingMetrics {
233 fn new(registry: &Registry, level: MetricLevel) -> Self {
234 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
235 "stream_executor_row_count",
236 "Total number of rows that have been output from each executor",
237 &["actor_id", "fragment_id", "executor_identity"],
238 registry
239 )
240 .unwrap()
241 .relabel_debug_1(level);
242
243 let stream_node_output_row_count = CountMap::new();
244 let stream_node_output_blocking_duration_ns = CountMap::new();
245
246 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
247 "stream_source_output_rows_counts",
248 "Total number of rows that have been output from source",
249 &["source_id", "source_name", "actor_id", "fragment_id"],
250 registry
251 )
252 .unwrap();
253
254 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
255 "stream_source_split_change_event_count",
256 "Total number of split change events that have been operated by source",
257 &["source_id", "source_name", "actor_id", "fragment_id"],
258 registry
259 )
260 .unwrap();
261
262 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
263 "stream_source_backfill_rows_counts",
264 "Total number of rows that have been backfilled for source",
265 &["source_id", "source_name", "actor_id", "fragment_id"],
266 registry
267 )
268 .unwrap();
269
270 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
271 "stream_sink_input_row_count",
272 "Total number of rows streamed into sink executors",
273 &["sink_id", "actor_id", "fragment_id"],
274 registry
275 )
276 .unwrap();
277
278 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
279 "stream_sink_input_bytes",
280 "Total size of chunks streamed into sink executors",
281 &["sink_id", "actor_id", "fragment_id"],
282 registry
283 )
284 .unwrap();
285
286 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
287 "stream_mview_input_row_count",
288 "Total number of rows streamed into materialize executors",
289 &["actor_id", "table_id", "fragment_id"],
290 registry
291 )
292 .unwrap()
293 .relabel_debug_1(level);
294
295 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
296 "stream_mview_current_epoch",
297 "The current epoch of the materialized executor",
298 &["actor_id", "table_id", "fragment_id"],
299 registry
300 )
301 .unwrap()
302 .relabel_debug_1(level);
303
304 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
305 "stream_pg_cdc_state_table_lsn",
306 "Current LSN value stored in PostgreSQL CDC state table",
307 &["source_id"],
308 registry,
309 )
310 .unwrap();
311
312 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
313 "stream_pg_cdc_jni_commit_offset_lsn",
314 "LSN value when JNI commit offset is called for PostgreSQL CDC",
315 &["source_id"],
316 registry,
317 )
318 .unwrap();
319
320 let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
321 "stream_mysql_cdc_state_binlog_file_seq",
322 "Current binlog file sequence number stored in MySQL CDC state table",
323 &["source_id"],
324 registry,
325 )
326 .unwrap();
327
328 let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
329 "stream_mysql_cdc_state_binlog_position",
330 "Current binlog position stored in MySQL CDC state table",
331 &["source_id"],
332 registry,
333 )
334 .unwrap();
335
336 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
337 "stream_sink_chunk_buffer_size",
338 "Total size of chunks buffered in a barrier",
339 &["sink_id", "actor_id", "fragment_id"],
340 registry
341 )
342 .unwrap();
343 let actor_output_buffer_blocking_duration_ns =
344 register_guarded_int_counter_vec_with_registry!(
345 "stream_actor_output_buffer_blocking_duration_ns",
346 "Total blocking duration (ns) of output buffer",
347 &["actor_id", "fragment_id", "downstream_fragment_id"],
348 registry
349 )
350 .unwrap()
351 .relabel_debug_1(level);
353
354 let actor_input_buffer_blocking_duration_ns =
355 register_guarded_int_counter_vec_with_registry!(
356 "stream_actor_input_buffer_blocking_duration_ns",
357 "Total blocking duration (ns) of input buffer",
358 &["actor_id", "fragment_id", "upstream_fragment_id"],
359 registry
360 )
361 .unwrap()
362 .relabel_debug_1(level);
364
365 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
366 "stream_exchange_frag_recv_size",
367 "Total size of messages that have been received from upstream Fragment",
368 &["up_fragment_id", "down_fragment_id"],
369 registry
370 )
371 .unwrap();
372
373 let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
374 "stream_actor_poll_duration",
375 "tokio's metrics",
376 &["actor_id", "fragment_id"],
377 registry
378 )
379 .unwrap()
380 .relabel_debug_1(level);
381
382 let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
383 "stream_actor_poll_cnt",
384 "tokio's metrics",
385 &["actor_id", "fragment_id"],
386 registry
387 )
388 .unwrap()
389 .relabel_debug_1(level);
390
391 let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
392 "stream_actor_scheduled_duration",
393 "tokio's metrics",
394 &["actor_id", "fragment_id"],
395 registry
396 )
397 .unwrap()
398 .relabel_debug_1(level);
399
400 let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
401 "stream_actor_scheduled_cnt",
402 "tokio's metrics",
403 &["actor_id", "fragment_id"],
404 registry
405 )
406 .unwrap()
407 .relabel_debug_1(level);
408
409 let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
410 "stream_actor_idle_duration",
411 "tokio's metrics",
412 &["actor_id", "fragment_id"],
413 registry
414 )
415 .unwrap()
416 .relabel_debug_1(level);
417
418 let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
419 "stream_actor_idle_cnt",
420 "tokio's metrics",
421 &["actor_id", "fragment_id"],
422 registry
423 )
424 .unwrap()
425 .relabel_debug_1(level);
426
427 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
428 "stream_actor_in_record_cnt",
429 "Total number of rows actor received",
430 &["actor_id", "fragment_id", "upstream_fragment_id"],
431 registry
432 )
433 .unwrap()
434 .relabel_debug_1(level);
435
436 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
437 "stream_actor_out_record_cnt",
438 "Total number of rows actor sent",
439 &["actor_id", "fragment_id"],
440 registry
441 )
442 .unwrap()
443 .relabel_debug_1(level);
444
445 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
446 "stream_actor_current_epoch",
447 "Current epoch of actor",
448 &["actor_id", "fragment_id"],
449 registry
450 )
451 .unwrap()
452 .relabel_debug_1(level);
453
454 let actor_count = register_guarded_int_gauge_vec_with_registry!(
455 "stream_actor_count",
456 "Total number of actors (parallelism)",
457 &["fragment_id"],
458 registry
459 )
460 .unwrap();
461
462 let opts = histogram_opts!(
463 "stream_merge_barrier_align_duration",
464 "Duration of merge align barrier",
465 exponential_buckets(0.0001, 2.0, 21).unwrap() );
467 let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
468 opts,
469 &["actor_id", "fragment_id"],
470 registry
471 )
472 .unwrap()
473 .relabel_debug_1(level);
474
475 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
476 "stream_join_lookup_miss_count",
477 "Join executor lookup miss duration",
478 &["side", "join_table_id", "actor_id", "fragment_id"],
479 registry
480 )
481 .unwrap();
482
483 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
484 "stream_join_lookup_total_count",
485 "Join executor lookup total operation",
486 &["side", "join_table_id", "actor_id", "fragment_id"],
487 registry
488 )
489 .unwrap();
490
491 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
492 "stream_join_insert_cache_miss_count",
493 "Join executor cache miss when insert operation",
494 &["side", "join_table_id", "actor_id", "fragment_id"],
495 registry
496 )
497 .unwrap();
498
499 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
500 "stream_join_actor_input_waiting_duration_ns",
501 "Total waiting duration (ns) of input buffer of join actor",
502 &["actor_id", "fragment_id"],
503 registry
504 )
505 .unwrap();
506
507 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
508 "stream_join_match_duration_ns",
509 "Matching duration for each side",
510 &["actor_id", "fragment_id", "side"],
511 registry
512 )
513 .unwrap();
514
515 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
516 "stream_barrier_align_duration_ns",
517 "Duration of join align barrier",
518 &["actor_id", "fragment_id", "wait_side", "executor"],
519 registry
520 )
521 .unwrap()
522 .relabel_debug_1(level);
523
524 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
525 "stream_join_cached_entry_count",
526 "Number of cached entries in streaming join operators",
527 &["actor_id", "fragment_id", "side"],
528 registry
529 )
530 .unwrap();
531
532 let join_matched_join_keys_opts = histogram_opts!(
533 "stream_join_matched_join_keys",
534 "The number of keys matched in the opposite side",
535 exponential_buckets(16.0, 2.0, 28).unwrap() );
537
538 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
539 join_matched_join_keys_opts,
540 &["actor_id", "fragment_id", "table_id"],
541 registry
542 )
543 .unwrap()
544 .relabel_debug_1(level);
545
546 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
547 "stream_agg_lookup_miss_count",
548 "Aggregation executor lookup miss duration",
549 &["table_id", "actor_id", "fragment_id"],
550 registry
551 )
552 .unwrap();
553
554 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
555 "stream_agg_lookup_total_count",
556 "Aggregation executor lookup total operation",
557 &["table_id", "actor_id", "fragment_id"],
558 registry
559 )
560 .unwrap();
561
562 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
563 "stream_agg_distinct_cache_miss_count",
564 "Aggregation executor dinsinct miss duration",
565 &["table_id", "actor_id", "fragment_id"],
566 registry
567 )
568 .unwrap();
569
570 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
571 "stream_agg_distinct_total_cache_count",
572 "Aggregation executor distinct total operation",
573 &["table_id", "actor_id", "fragment_id"],
574 registry
575 )
576 .unwrap();
577
578 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
579 "stream_agg_distinct_cached_entry_count",
580 "Total entry counts in distinct aggregation executor cache",
581 &["table_id", "actor_id", "fragment_id"],
582 registry
583 )
584 .unwrap();
585
586 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
587 "stream_agg_dirty_groups_count",
588 "Total dirty group counts in aggregation executor",
589 &["table_id", "actor_id", "fragment_id"],
590 registry
591 )
592 .unwrap();
593
594 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
595 "stream_agg_dirty_groups_heap_size",
596 "Total dirty group heap size in aggregation executor",
597 &["table_id", "actor_id", "fragment_id"],
598 registry
599 )
600 .unwrap();
601
602 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
603 "stream_agg_state_cache_lookup_count",
604 "Aggregation executor state cache lookup count",
605 &["table_id", "actor_id", "fragment_id"],
606 registry
607 )
608 .unwrap();
609
610 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
611 "stream_agg_state_cache_miss_count",
612 "Aggregation executor state cache miss count",
613 &["table_id", "actor_id", "fragment_id"],
614 registry
615 )
616 .unwrap();
617
618 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
619 "stream_group_top_n_cache_miss_count",
620 "Group top n executor cache miss count",
621 &["table_id", "actor_id", "fragment_id"],
622 registry
623 )
624 .unwrap();
625
626 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
627 "stream_group_top_n_total_query_cache_count",
628 "Group top n executor query cache total count",
629 &["table_id", "actor_id", "fragment_id"],
630 registry
631 )
632 .unwrap();
633
634 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
635 "stream_group_top_n_cached_entry_count",
636 "Total entry counts in group top n executor cache",
637 &["table_id", "actor_id", "fragment_id"],
638 registry
639 )
640 .unwrap();
641
642 let group_top_n_appendonly_cache_miss_count =
643 register_guarded_int_counter_vec_with_registry!(
644 "stream_group_top_n_appendonly_cache_miss_count",
645 "Group top n appendonly executor cache miss count",
646 &["table_id", "actor_id", "fragment_id"],
647 registry
648 )
649 .unwrap();
650
651 let group_top_n_appendonly_total_query_cache_count =
652 register_guarded_int_counter_vec_with_registry!(
653 "stream_group_top_n_appendonly_total_query_cache_count",
654 "Group top n appendonly executor total cache count",
655 &["table_id", "actor_id", "fragment_id"],
656 registry
657 )
658 .unwrap();
659
660 let group_top_n_appendonly_cached_entry_count =
661 register_guarded_int_gauge_vec_with_registry!(
662 "stream_group_top_n_appendonly_cached_entry_count",
663 "Total entry counts in group top n appendonly executor cache",
664 &["table_id", "actor_id", "fragment_id"],
665 registry
666 )
667 .unwrap();
668
669 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
670 "stream_lookup_cache_miss_count",
671 "Lookup executor cache miss count",
672 &["table_id", "actor_id", "fragment_id"],
673 registry
674 )
675 .unwrap();
676
677 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
678 "stream_lookup_total_query_cache_count",
679 "Lookup executor query cache total count",
680 &["table_id", "actor_id", "fragment_id"],
681 registry
682 )
683 .unwrap();
684
685 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
686 "stream_lookup_cached_entry_count",
687 "Total entry counts in lookup executor cache",
688 &["table_id", "actor_id", "fragment_id"],
689 registry
690 )
691 .unwrap();
692
693 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
694 "stream_temporal_join_cache_miss_count",
695 "Temporal join executor cache miss count",
696 &["table_id", "actor_id", "fragment_id"],
697 registry
698 )
699 .unwrap();
700
701 let temporal_join_total_query_cache_count =
702 register_guarded_int_counter_vec_with_registry!(
703 "stream_temporal_join_total_query_cache_count",
704 "Temporal join executor query cache total count",
705 &["table_id", "actor_id", "fragment_id"],
706 registry
707 )
708 .unwrap();
709
710 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
711 "stream_temporal_join_cached_entry_count",
712 "Total entry count in temporal join executor cache",
713 &["table_id", "actor_id", "fragment_id"],
714 registry
715 )
716 .unwrap();
717
718 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
719 "stream_agg_cached_entry_count",
720 "Number of cached keys in streaming aggregation operators",
721 &["table_id", "actor_id", "fragment_id"],
722 registry
723 )
724 .unwrap();
725
726 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
727 "stream_agg_chunk_lookup_miss_count",
728 "Aggregation executor chunk-level lookup miss duration",
729 &["table_id", "actor_id", "fragment_id"],
730 registry
731 )
732 .unwrap();
733
734 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
735 "stream_agg_chunk_lookup_total_count",
736 "Aggregation executor chunk-level lookup total operation",
737 &["table_id", "actor_id", "fragment_id"],
738 registry
739 )
740 .unwrap();
741
742 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
743 "stream_backfill_snapshot_read_row_count",
744 "Total number of rows that have been read from the backfill snapshot",
745 &["table_id", "actor_id"],
746 registry
747 )
748 .unwrap();
749
750 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
751 "stream_backfill_upstream_output_row_count",
752 "Total number of rows that have been output from the backfill upstream",
753 &["table_id", "actor_id"],
754 registry
755 )
756 .unwrap();
757
758 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
759 "stream_cdc_backfill_snapshot_read_row_count",
760 "Total number of rows that have been read from the cdc_backfill snapshot",
761 &["table_id", "actor_id"],
762 registry
763 )
764 .unwrap();
765
766 let cdc_backfill_upstream_output_row_count =
767 register_guarded_int_counter_vec_with_registry!(
768 "stream_cdc_backfill_upstream_output_row_count",
769 "Total number of rows that have been output from the cdc_backfill upstream",
770 &["table_id", "actor_id"],
771 registry
772 )
773 .unwrap();
774
775 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
776 "stream_snapshot_backfill_consume_snapshot_row_count",
777 "Total number of rows that have been output from snapshot backfill",
778 &["table_id", "actor_id", "stage"],
779 registry
780 )
781 .unwrap();
782
783 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
784 "stream_over_window_cached_entry_count",
785 "Total entry (partition) count in over window executor cache",
786 &["table_id", "actor_id", "fragment_id"],
787 registry
788 )
789 .unwrap();
790
791 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
792 "stream_over_window_cache_lookup_count",
793 "Over window executor cache lookup count",
794 &["table_id", "actor_id", "fragment_id"],
795 registry
796 )
797 .unwrap();
798
799 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
800 "stream_over_window_cache_miss_count",
801 "Over window executor cache miss count",
802 &["table_id", "actor_id", "fragment_id"],
803 registry
804 )
805 .unwrap();
806
807 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
808 "stream_over_window_range_cache_entry_count",
809 "Over window partition range cache entry count",
810 &["table_id", "actor_id", "fragment_id"],
811 registry,
812 )
813 .unwrap();
814
815 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
816 "stream_over_window_range_cache_lookup_count",
817 "Over window partition range cache lookup count",
818 &["table_id", "actor_id", "fragment_id"],
819 registry
820 )
821 .unwrap();
822
823 let over_window_range_cache_left_miss_count =
824 register_guarded_int_counter_vec_with_registry!(
825 "stream_over_window_range_cache_left_miss_count",
826 "Over window partition range cache left miss count",
827 &["table_id", "actor_id", "fragment_id"],
828 registry
829 )
830 .unwrap();
831
832 let over_window_range_cache_right_miss_count =
833 register_guarded_int_counter_vec_with_registry!(
834 "stream_over_window_range_cache_right_miss_count",
835 "Over window partition range cache right miss count",
836 &["table_id", "actor_id", "fragment_id"],
837 registry
838 )
839 .unwrap();
840
841 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
842 "stream_over_window_accessed_entry_count",
843 "Over window accessed entry count",
844 &["table_id", "actor_id", "fragment_id"],
845 registry
846 )
847 .unwrap();
848
849 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
850 "stream_over_window_compute_count",
851 "Over window compute count",
852 &["table_id", "actor_id", "fragment_id"],
853 registry
854 )
855 .unwrap();
856
857 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
858 "stream_over_window_same_output_count",
859 "Over window same output count",
860 &["table_id", "actor_id", "fragment_id"],
861 registry
862 )
863 .unwrap();
864
865 let opts = histogram_opts!(
866 "stream_barrier_inflight_duration_seconds",
867 "barrier_inflight_latency",
868 exponential_buckets(0.1, 1.5, 16).unwrap() );
870 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
871
872 let opts = histogram_opts!(
873 "stream_barrier_sync_storage_duration_seconds",
874 "barrier_sync_latency",
875 exponential_buckets(0.1, 1.5, 16).unwrap() );
877 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
878
879 let opts = histogram_opts!(
880 "stream_barrier_batch_size",
881 "barrier_batch_size",
882 exponential_buckets(1.0, 2.0, 8).unwrap()
883 );
884 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
885
886 let barrier_manager_progress = register_int_counter_with_registry!(
887 "stream_barrier_manager_progress",
888 "The number of actors that have processed the earliest in-flight barriers",
889 registry
890 )
891 .unwrap();
892
893 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
894 "sync_kv_log_store_wait_next_poll_ns",
895 "Total duration (ns) of waiting for next poll",
896 &["actor_id", "target", "fragment_id", "relation"],
897 registry
898 )
899 .unwrap();
900
901 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
902 "sync_kv_log_store_read_count",
903 "read row count throughput of sync_kv log store",
904 &["type", "actor_id", "target", "fragment_id", "relation"],
905 registry
906 )
907 .unwrap();
908
909 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
910 "sync_kv_log_store_read_size",
911 "read size throughput of sync_kv log store",
912 &["type", "actor_id", "target", "fragment_id", "relation"],
913 registry
914 )
915 .unwrap();
916
917 let sync_kv_log_store_write_pause_duration_ns =
918 register_guarded_int_counter_vec_with_registry!(
919 "sync_kv_log_store_write_pause_duration_ns",
920 "Duration (ns) of sync_kv log store write pause",
921 &["actor_id", "target", "fragment_id", "relation"],
922 registry
923 )
924 .unwrap();
925
926 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
927 "sync_kv_log_store_state",
928 "clean/unclean state transition for sync_kv log store",
929 &["state", "actor_id", "target", "fragment_id", "relation"],
930 registry
931 )
932 .unwrap();
933
934 let sync_kv_log_store_storage_write_count =
935 register_guarded_int_counter_vec_with_registry!(
936 "sync_kv_log_store_storage_write_count",
937 "Write row count throughput of sync_kv log store",
938 &["actor_id", "target", "fragment_id", "relation"],
939 registry
940 )
941 .unwrap();
942
943 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
944 "sync_kv_log_store_storage_write_size",
945 "Write size throughput of sync_kv log store",
946 &["actor_id", "target", "fragment_id", "relation"],
947 registry
948 )
949 .unwrap();
950
951 let sync_kv_log_store_buffer_unconsumed_item_count =
952 register_guarded_int_gauge_vec_with_registry!(
953 "sync_kv_log_store_buffer_unconsumed_item_count",
954 "Number of Unconsumed Item in buffer",
955 &["actor_id", "target", "fragment_id", "relation"],
956 registry
957 )
958 .unwrap();
959
960 let sync_kv_log_store_buffer_unconsumed_row_count =
961 register_guarded_int_gauge_vec_with_registry!(
962 "sync_kv_log_store_buffer_unconsumed_row_count",
963 "Number of Unconsumed Row in buffer",
964 &["actor_id", "target", "fragment_id", "relation"],
965 registry
966 )
967 .unwrap();
968
969 let sync_kv_log_store_buffer_unconsumed_epoch_count =
970 register_guarded_int_gauge_vec_with_registry!(
971 "sync_kv_log_store_buffer_unconsumed_epoch_count",
972 "Number of Unconsumed Epoch in buffer",
973 &["actor_id", "target", "fragment_id", "relation"],
974 registry
975 )
976 .unwrap();
977
978 let sync_kv_log_store_buffer_unconsumed_min_epoch =
979 register_guarded_int_gauge_vec_with_registry!(
980 "sync_kv_log_store_buffer_unconsumed_min_epoch",
981 "Number of Unconsumed Epoch in buffer",
982 &["actor_id", "target", "fragment_id", "relation"],
983 registry
984 )
985 .unwrap();
986
987 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
988 "kv_log_store_storage_write_count",
989 "Write row count throughput of kv log store",
990 &["actor_id", "connector", "sink_id", "sink_name"],
991 registry
992 )
993 .unwrap();
994
995 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
996 "kv_log_store_storage_write_size",
997 "Write size throughput of kv log store",
998 &["actor_id", "connector", "sink_id", "sink_name"],
999 registry
1000 )
1001 .unwrap();
1002
1003 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1004 "kv_log_store_storage_read_count",
1005 "Write row count throughput of kv log store",
1006 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1007 registry
1008 )
1009 .unwrap();
1010
1011 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1012 "kv_log_store_storage_read_size",
1013 "Write size throughput of kv log store",
1014 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1015 registry
1016 )
1017 .unwrap();
1018
1019 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1020 "kv_log_store_rewind_count",
1021 "Kv log store rewind rate",
1022 &["actor_id", "connector", "sink_id", "sink_name"],
1023 registry
1024 )
1025 .unwrap();
1026
1027 let kv_log_store_rewind_delay_opts = {
1028 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1029 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1030 - REWIND_BASE_DELAY.as_secs_f64().log2())
1031 .ceil() as usize;
1032 let buckets = exponential_buckets(
1033 REWIND_BASE_DELAY.as_secs_f64(),
1034 REWIND_BACKOFF_FACTOR as _,
1035 bucket_count,
1036 )
1037 .unwrap();
1038 histogram_opts!(
1039 "kv_log_store_rewind_delay",
1040 "Kv log store rewind delay",
1041 buckets,
1042 )
1043 };
1044
1045 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1046 kv_log_store_rewind_delay_opts,
1047 &["actor_id", "connector", "sink_id", "sink_name"],
1048 registry
1049 )
1050 .unwrap();
1051
1052 let kv_log_store_buffer_unconsumed_item_count =
1053 register_guarded_int_gauge_vec_with_registry!(
1054 "kv_log_store_buffer_unconsumed_item_count",
1055 "Number of Unconsumed Item in buffer",
1056 &["actor_id", "connector", "sink_id", "sink_name"],
1057 registry
1058 )
1059 .unwrap();
1060
1061 let kv_log_store_buffer_unconsumed_row_count =
1062 register_guarded_int_gauge_vec_with_registry!(
1063 "kv_log_store_buffer_unconsumed_row_count",
1064 "Number of Unconsumed Row in buffer",
1065 &["actor_id", "connector", "sink_id", "sink_name"],
1066 registry
1067 )
1068 .unwrap();
1069
1070 let kv_log_store_buffer_unconsumed_epoch_count =
1071 register_guarded_int_gauge_vec_with_registry!(
1072 "kv_log_store_buffer_unconsumed_epoch_count",
1073 "Number of Unconsumed Epoch in buffer",
1074 &["actor_id", "connector", "sink_id", "sink_name"],
1075 registry
1076 )
1077 .unwrap();
1078
1079 let kv_log_store_buffer_unconsumed_min_epoch =
1080 register_guarded_int_gauge_vec_with_registry!(
1081 "kv_log_store_buffer_unconsumed_min_epoch",
1082 "Number of Unconsumed Epoch in buffer",
1083 &["actor_id", "connector", "sink_id", "sink_name"],
1084 registry
1085 )
1086 .unwrap();
1087
1088 let lru_runtime_loop_count = register_int_counter_with_registry!(
1089 "lru_runtime_loop_count",
1090 "The counts of the eviction loop in LRU manager per second",
1091 registry
1092 )
1093 .unwrap();
1094
1095 let lru_latest_sequence = register_int_gauge_with_registry!(
1096 "lru_latest_sequence",
1097 "Current LRU global sequence",
1098 registry,
1099 )
1100 .unwrap();
1101
1102 let lru_watermark_sequence = register_int_gauge_with_registry!(
1103 "lru_watermark_sequence",
1104 "Current LRU watermark sequence",
1105 registry,
1106 )
1107 .unwrap();
1108
1109 let lru_eviction_policy = register_int_gauge_with_registry!(
1110 "lru_eviction_policy",
1111 "Current LRU eviction policy",
1112 registry,
1113 )
1114 .unwrap();
1115
1116 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1117 "jemalloc_allocated_bytes",
1118 "The allocated memory jemalloc, got from jemalloc_ctl",
1119 registry
1120 )
1121 .unwrap();
1122
1123 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1124 "jemalloc_active_bytes",
1125 "The active memory jemalloc, got from jemalloc_ctl",
1126 registry
1127 )
1128 .unwrap();
1129
1130 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1131 "jemalloc_resident_bytes",
1132 "The active memory jemalloc, got from jemalloc_ctl",
1133 registry
1134 )
1135 .unwrap();
1136
1137 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1138 "jemalloc_metadata_bytes",
1139 "The active memory jemalloc, got from jemalloc_ctl",
1140 registry
1141 )
1142 .unwrap();
1143
1144 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1145 "jvm_allocated_bytes",
1146 "The allocated jvm memory",
1147 registry
1148 )
1149 .unwrap();
1150
1151 let jvm_active_bytes = register_int_gauge_with_registry!(
1152 "jvm_active_bytes",
1153 "The active jvm memory",
1154 registry
1155 )
1156 .unwrap();
1157
1158 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1159 "stream_materialize_cache_hit_count",
1160 "Materialize executor cache hit count",
1161 &["actor_id", "table_id", "fragment_id"],
1162 registry
1163 )
1164 .unwrap()
1165 .relabel_debug_1(level);
1166
1167 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1168 "stream_materialize_data_exist_count",
1169 "Materialize executor data exist count",
1170 &["actor_id", "table_id", "fragment_id"],
1171 registry
1172 )
1173 .unwrap()
1174 .relabel_debug_1(level);
1175
1176 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1177 "stream_materialize_cache_total_count",
1178 "Materialize executor cache total operation",
1179 &["actor_id", "table_id", "fragment_id"],
1180 registry
1181 )
1182 .unwrap()
1183 .relabel_debug_1(level);
1184
1185 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1186 "stream_memory_usage",
1187 "Memory usage for stream executors",
1188 &["actor_id", "table_id", "desc"],
1189 registry
1190 )
1191 .unwrap()
1192 .relabel_debug_1(level);
1193
1194 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1195 "gap_fill_generated_rows_count",
1196 "Total number of rows generated by gap fill executor",
1197 &["actor_id", "fragment_id"],
1198 registry
1199 )
1200 .unwrap()
1201 .relabel_debug_1(level);
1202
1203 Self {
1204 level,
1205 executor_row_count,
1206 mem_stream_node_output_row_count: stream_node_output_row_count,
1207 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1208 actor_scheduled_duration,
1209 actor_scheduled_cnt,
1210 actor_poll_duration,
1211 actor_poll_cnt,
1212 actor_idle_duration,
1213 actor_idle_cnt,
1214 actor_count,
1215 actor_in_record_cnt,
1216 actor_out_record_cnt,
1217 actor_current_epoch,
1218 source_output_row_count,
1219 source_split_change_count,
1220 source_backfill_row_count,
1221 sink_input_row_count,
1222 sink_input_bytes,
1223 sink_chunk_buffer_size,
1224 exchange_frag_recv_size,
1225 merge_barrier_align_duration,
1226 actor_output_buffer_blocking_duration_ns,
1227 actor_input_buffer_blocking_duration_ns,
1228 join_lookup_miss_count,
1229 join_lookup_total_count,
1230 join_insert_cache_miss_count,
1231 join_actor_input_waiting_duration_ns,
1232 join_match_duration_ns,
1233 join_cached_entry_count,
1234 join_matched_join_keys,
1235 barrier_align_duration,
1236 agg_lookup_miss_count,
1237 agg_total_lookup_count,
1238 agg_cached_entry_count,
1239 agg_chunk_lookup_miss_count,
1240 agg_chunk_total_lookup_count,
1241 agg_dirty_groups_count,
1242 agg_dirty_groups_heap_size,
1243 agg_distinct_cache_miss_count,
1244 agg_distinct_total_cache_count,
1245 agg_distinct_cached_entry_count,
1246 agg_state_cache_lookup_count,
1247 agg_state_cache_miss_count,
1248 group_top_n_cache_miss_count,
1249 group_top_n_total_query_cache_count,
1250 group_top_n_cached_entry_count,
1251 group_top_n_appendonly_cache_miss_count,
1252 group_top_n_appendonly_total_query_cache_count,
1253 group_top_n_appendonly_cached_entry_count,
1254 lookup_cache_miss_count,
1255 lookup_total_query_cache_count,
1256 lookup_cached_entry_count,
1257 temporal_join_cache_miss_count,
1258 temporal_join_total_query_cache_count,
1259 temporal_join_cached_entry_count,
1260 backfill_snapshot_read_row_count,
1261 backfill_upstream_output_row_count,
1262 cdc_backfill_snapshot_read_row_count,
1263 cdc_backfill_upstream_output_row_count,
1264 snapshot_backfill_consume_row_count,
1265 over_window_cached_entry_count,
1266 over_window_cache_lookup_count,
1267 over_window_cache_miss_count,
1268 over_window_range_cache_entry_count,
1269 over_window_range_cache_lookup_count,
1270 over_window_range_cache_left_miss_count,
1271 over_window_range_cache_right_miss_count,
1272 over_window_accessed_entry_count,
1273 over_window_compute_count,
1274 over_window_same_output_count,
1275 barrier_inflight_latency,
1276 barrier_sync_latency,
1277 barrier_batch_size,
1278 barrier_manager_progress,
1279 kv_log_store_storage_write_count,
1280 kv_log_store_storage_write_size,
1281 kv_log_store_rewind_count,
1282 kv_log_store_rewind_delay,
1283 kv_log_store_storage_read_count,
1284 kv_log_store_storage_read_size,
1285 kv_log_store_buffer_unconsumed_item_count,
1286 kv_log_store_buffer_unconsumed_row_count,
1287 kv_log_store_buffer_unconsumed_epoch_count,
1288 kv_log_store_buffer_unconsumed_min_epoch,
1289 sync_kv_log_store_read_count,
1290 sync_kv_log_store_read_size,
1291 sync_kv_log_store_write_pause_duration_ns,
1292 sync_kv_log_store_state,
1293 sync_kv_log_store_wait_next_poll_ns,
1294 sync_kv_log_store_storage_write_count,
1295 sync_kv_log_store_storage_write_size,
1296 sync_kv_log_store_buffer_unconsumed_item_count,
1297 sync_kv_log_store_buffer_unconsumed_row_count,
1298 sync_kv_log_store_buffer_unconsumed_epoch_count,
1299 sync_kv_log_store_buffer_unconsumed_min_epoch,
1300 lru_runtime_loop_count,
1301 lru_latest_sequence,
1302 lru_watermark_sequence,
1303 lru_eviction_policy,
1304 jemalloc_allocated_bytes,
1305 jemalloc_active_bytes,
1306 jemalloc_resident_bytes,
1307 jemalloc_metadata_bytes,
1308 jvm_allocated_bytes,
1309 jvm_active_bytes,
1310 stream_memory_usage,
1311 materialize_cache_hit_count,
1312 materialize_data_exist_count,
1313 materialize_cache_total_count,
1314 materialize_input_row_count,
1315 materialize_current_epoch,
1316 pg_cdc_state_table_lsn,
1317 pg_cdc_jni_commit_offset_lsn,
1318 mysql_cdc_state_binlog_file_seq,
1319 mysql_cdc_state_binlog_position,
1320 gap_fill_generated_rows_count,
1321 }
1322 }
1323
1324 pub fn unused() -> Self {
1326 global_streaming_metrics(MetricLevel::Disabled)
1327 }
1328
1329 pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1330 let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1331 let actor_scheduled_duration = self
1332 .actor_scheduled_duration
1333 .with_guarded_label_values(label_list);
1334 let actor_scheduled_cnt = self
1335 .actor_scheduled_cnt
1336 .with_guarded_label_values(label_list);
1337 let actor_poll_duration = self
1338 .actor_poll_duration
1339 .with_guarded_label_values(label_list);
1340 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1341 let actor_idle_duration = self
1342 .actor_idle_duration
1343 .with_guarded_label_values(label_list);
1344 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1345 ActorMetrics {
1346 actor_scheduled_duration,
1347 actor_scheduled_cnt,
1348 actor_poll_duration,
1349 actor_poll_cnt,
1350 actor_idle_duration,
1351 actor_idle_cnt,
1352 }
1353 }
1354
1355 pub(crate) fn new_actor_input_metrics(
1356 &self,
1357 actor_id: ActorId,
1358 fragment_id: FragmentId,
1359 upstream_fragment_id: FragmentId,
1360 ) -> ActorInputMetrics {
1361 let actor_id_str = actor_id.to_string();
1362 let fragment_id_str = fragment_id.to_string();
1363 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1364 ActorInputMetrics {
1365 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1366 &actor_id_str,
1367 &fragment_id_str,
1368 &upstream_fragment_id_str,
1369 ]),
1370 actor_input_buffer_blocking_duration_ns: self
1371 .actor_input_buffer_blocking_duration_ns
1372 .with_guarded_label_values(&[
1373 &actor_id_str,
1374 &fragment_id_str,
1375 &upstream_fragment_id_str,
1376 ]),
1377 }
1378 }
1379
1380 pub fn new_sink_exec_metrics(
1381 &self,
1382 id: SinkId,
1383 actor_id: ActorId,
1384 fragment_id: FragmentId,
1385 ) -> SinkExecutorMetrics {
1386 let label_list: &[&str; 3] = &[
1387 &id.to_string(),
1388 &actor_id.to_string(),
1389 &fragment_id.to_string(),
1390 ];
1391 SinkExecutorMetrics {
1392 sink_input_row_count: self
1393 .sink_input_row_count
1394 .with_guarded_label_values(label_list),
1395 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1396 sink_chunk_buffer_size: self
1397 .sink_chunk_buffer_size
1398 .with_guarded_label_values(label_list),
1399 }
1400 }
1401
1402 pub fn new_group_top_n_metrics(
1403 &self,
1404 table_id: TableId,
1405 actor_id: ActorId,
1406 fragment_id: FragmentId,
1407 ) -> GroupTopNMetrics {
1408 let label_list: &[&str; 3] = &[
1409 &table_id.to_string(),
1410 &actor_id.to_string(),
1411 &fragment_id.to_string(),
1412 ];
1413
1414 GroupTopNMetrics {
1415 group_top_n_cache_miss_count: self
1416 .group_top_n_cache_miss_count
1417 .with_guarded_label_values(label_list),
1418 group_top_n_total_query_cache_count: self
1419 .group_top_n_total_query_cache_count
1420 .with_guarded_label_values(label_list),
1421 group_top_n_cached_entry_count: self
1422 .group_top_n_cached_entry_count
1423 .with_guarded_label_values(label_list),
1424 }
1425 }
1426
1427 pub fn new_append_only_group_top_n_metrics(
1428 &self,
1429 table_id: TableId,
1430 actor_id: ActorId,
1431 fragment_id: FragmentId,
1432 ) -> GroupTopNMetrics {
1433 let label_list: &[&str; 3] = &[
1434 &table_id.to_string(),
1435 &actor_id.to_string(),
1436 &fragment_id.to_string(),
1437 ];
1438
1439 GroupTopNMetrics {
1440 group_top_n_cache_miss_count: self
1441 .group_top_n_appendonly_cache_miss_count
1442 .with_guarded_label_values(label_list),
1443 group_top_n_total_query_cache_count: self
1444 .group_top_n_appendonly_total_query_cache_count
1445 .with_guarded_label_values(label_list),
1446 group_top_n_cached_entry_count: self
1447 .group_top_n_appendonly_cached_entry_count
1448 .with_guarded_label_values(label_list),
1449 }
1450 }
1451
1452 pub fn new_lookup_executor_metrics(
1453 &self,
1454 table_id: TableId,
1455 actor_id: ActorId,
1456 fragment_id: FragmentId,
1457 ) -> LookupExecutorMetrics {
1458 let label_list: &[&str; 3] = &[
1459 &table_id.to_string(),
1460 &actor_id.to_string(),
1461 &fragment_id.to_string(),
1462 ];
1463
1464 LookupExecutorMetrics {
1465 lookup_cache_miss_count: self
1466 .lookup_cache_miss_count
1467 .with_guarded_label_values(label_list),
1468 lookup_total_query_cache_count: self
1469 .lookup_total_query_cache_count
1470 .with_guarded_label_values(label_list),
1471 lookup_cached_entry_count: self
1472 .lookup_cached_entry_count
1473 .with_guarded_label_values(label_list),
1474 }
1475 }
1476
1477 pub fn new_hash_agg_metrics(
1478 &self,
1479 table_id: TableId,
1480 actor_id: ActorId,
1481 fragment_id: FragmentId,
1482 ) -> HashAggMetrics {
1483 let label_list: &[&str; 3] = &[
1484 &table_id.to_string(),
1485 &actor_id.to_string(),
1486 &fragment_id.to_string(),
1487 ];
1488 HashAggMetrics {
1489 agg_lookup_miss_count: self
1490 .agg_lookup_miss_count
1491 .with_guarded_label_values(label_list),
1492 agg_total_lookup_count: self
1493 .agg_total_lookup_count
1494 .with_guarded_label_values(label_list),
1495 agg_cached_entry_count: self
1496 .agg_cached_entry_count
1497 .with_guarded_label_values(label_list),
1498 agg_chunk_lookup_miss_count: self
1499 .agg_chunk_lookup_miss_count
1500 .with_guarded_label_values(label_list),
1501 agg_chunk_total_lookup_count: self
1502 .agg_chunk_total_lookup_count
1503 .with_guarded_label_values(label_list),
1504 agg_dirty_groups_count: self
1505 .agg_dirty_groups_count
1506 .with_guarded_label_values(label_list),
1507 agg_dirty_groups_heap_size: self
1508 .agg_dirty_groups_heap_size
1509 .with_guarded_label_values(label_list),
1510 agg_state_cache_lookup_count: self
1511 .agg_state_cache_lookup_count
1512 .with_guarded_label_values(label_list),
1513 agg_state_cache_miss_count: self
1514 .agg_state_cache_miss_count
1515 .with_guarded_label_values(label_list),
1516 }
1517 }
1518
1519 pub fn new_agg_distinct_dedup_metrics(
1520 &self,
1521 table_id: TableId,
1522 actor_id: ActorId,
1523 fragment_id: FragmentId,
1524 ) -> AggDistinctDedupMetrics {
1525 let label_list: &[&str; 3] = &[
1526 &table_id.to_string(),
1527 &actor_id.to_string(),
1528 &fragment_id.to_string(),
1529 ];
1530 AggDistinctDedupMetrics {
1531 agg_distinct_cache_miss_count: self
1532 .agg_distinct_cache_miss_count
1533 .with_guarded_label_values(label_list),
1534 agg_distinct_total_cache_count: self
1535 .agg_distinct_total_cache_count
1536 .with_guarded_label_values(label_list),
1537 agg_distinct_cached_entry_count: self
1538 .agg_distinct_cached_entry_count
1539 .with_guarded_label_values(label_list),
1540 }
1541 }
1542
1543 pub fn new_temporal_join_metrics(
1544 &self,
1545 table_id: TableId,
1546 actor_id: ActorId,
1547 fragment_id: FragmentId,
1548 ) -> TemporalJoinMetrics {
1549 let label_list: &[&str; 3] = &[
1550 &table_id.to_string(),
1551 &actor_id.to_string(),
1552 &fragment_id.to_string(),
1553 ];
1554 TemporalJoinMetrics {
1555 temporal_join_cache_miss_count: self
1556 .temporal_join_cache_miss_count
1557 .with_guarded_label_values(label_list),
1558 temporal_join_total_query_cache_count: self
1559 .temporal_join_total_query_cache_count
1560 .with_guarded_label_values(label_list),
1561 temporal_join_cached_entry_count: self
1562 .temporal_join_cached_entry_count
1563 .with_guarded_label_values(label_list),
1564 }
1565 }
1566
1567 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1568 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1569 BackfillMetrics {
1570 backfill_snapshot_read_row_count: self
1571 .backfill_snapshot_read_row_count
1572 .with_guarded_label_values(label_list),
1573 backfill_upstream_output_row_count: self
1574 .backfill_upstream_output_row_count
1575 .with_guarded_label_values(label_list),
1576 }
1577 }
1578
1579 pub fn new_cdc_backfill_metrics(
1580 &self,
1581 table_id: TableId,
1582 actor_id: ActorId,
1583 ) -> CdcBackfillMetrics {
1584 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1585 CdcBackfillMetrics {
1586 cdc_backfill_snapshot_read_row_count: self
1587 .cdc_backfill_snapshot_read_row_count
1588 .with_guarded_label_values(label_list),
1589 cdc_backfill_upstream_output_row_count: self
1590 .cdc_backfill_upstream_output_row_count
1591 .with_guarded_label_values(label_list),
1592 }
1593 }
1594
1595 pub fn new_over_window_metrics(
1596 &self,
1597 table_id: TableId,
1598 actor_id: ActorId,
1599 fragment_id: FragmentId,
1600 ) -> OverWindowMetrics {
1601 let label_list: &[&str; 3] = &[
1602 &table_id.to_string(),
1603 &actor_id.to_string(),
1604 &fragment_id.to_string(),
1605 ];
1606 OverWindowMetrics {
1607 over_window_cached_entry_count: self
1608 .over_window_cached_entry_count
1609 .with_guarded_label_values(label_list),
1610 over_window_cache_lookup_count: self
1611 .over_window_cache_lookup_count
1612 .with_guarded_label_values(label_list),
1613 over_window_cache_miss_count: self
1614 .over_window_cache_miss_count
1615 .with_guarded_label_values(label_list),
1616 over_window_range_cache_entry_count: self
1617 .over_window_range_cache_entry_count
1618 .with_guarded_label_values(label_list),
1619 over_window_range_cache_lookup_count: self
1620 .over_window_range_cache_lookup_count
1621 .with_guarded_label_values(label_list),
1622 over_window_range_cache_left_miss_count: self
1623 .over_window_range_cache_left_miss_count
1624 .with_guarded_label_values(label_list),
1625 over_window_range_cache_right_miss_count: self
1626 .over_window_range_cache_right_miss_count
1627 .with_guarded_label_values(label_list),
1628 over_window_accessed_entry_count: self
1629 .over_window_accessed_entry_count
1630 .with_guarded_label_values(label_list),
1631 over_window_compute_count: self
1632 .over_window_compute_count
1633 .with_guarded_label_values(label_list),
1634 over_window_same_output_count: self
1635 .over_window_same_output_count
1636 .with_guarded_label_values(label_list),
1637 }
1638 }
1639
1640 pub fn new_materialize_cache_metrics(
1641 &self,
1642 table_id: TableId,
1643 actor_id: ActorId,
1644 fragment_id: FragmentId,
1645 ) -> MaterializeCacheMetrics {
1646 let label_list: &[&str; 3] = &[
1647 &actor_id.to_string(),
1648 &table_id.to_string(),
1649 &fragment_id.to_string(),
1650 ];
1651 MaterializeCacheMetrics {
1652 materialize_cache_hit_count: self
1653 .materialize_cache_hit_count
1654 .with_guarded_label_values(label_list),
1655 materialize_data_exist_count: self
1656 .materialize_data_exist_count
1657 .with_guarded_label_values(label_list),
1658 materialize_cache_total_count: self
1659 .materialize_cache_total_count
1660 .with_guarded_label_values(label_list),
1661 }
1662 }
1663
1664 pub fn new_materialize_metrics(
1665 &self,
1666 table_id: TableId,
1667 actor_id: ActorId,
1668 fragment_id: FragmentId,
1669 ) -> MaterializeMetrics {
1670 let label_list: &[&str; 3] = &[
1671 &actor_id.to_string(),
1672 &table_id.to_string(),
1673 &fragment_id.to_string(),
1674 ];
1675 MaterializeMetrics {
1676 materialize_input_row_count: self
1677 .materialize_input_row_count
1678 .with_guarded_label_values(label_list),
1679 materialize_current_epoch: self
1680 .materialize_current_epoch
1681 .with_guarded_label_values(label_list),
1682 }
1683 }
1684}
1685
1686pub(crate) struct ActorInputMetrics {
1687 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1688 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1689}
1690
1691pub struct ActorMetrics {
1693 pub actor_scheduled_duration: LabelGuardedIntCounter,
1694 pub actor_scheduled_cnt: LabelGuardedIntCounter,
1695 pub actor_poll_duration: LabelGuardedIntCounter,
1696 pub actor_poll_cnt: LabelGuardedIntCounter,
1697 pub actor_idle_duration: LabelGuardedIntCounter,
1698 pub actor_idle_cnt: LabelGuardedIntCounter,
1699}
1700
1701pub struct SinkExecutorMetrics {
1702 pub sink_input_row_count: LabelGuardedIntCounter,
1703 pub sink_input_bytes: LabelGuardedIntCounter,
1704 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1705}
1706
1707pub struct MaterializeCacheMetrics {
1708 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1709 pub materialize_data_exist_count: LabelGuardedIntCounter,
1710 pub materialize_cache_total_count: LabelGuardedIntCounter,
1711}
1712
1713pub struct MaterializeMetrics {
1714 pub materialize_input_row_count: LabelGuardedIntCounter,
1715 pub materialize_current_epoch: LabelGuardedIntGauge,
1716}
1717
1718pub struct GroupTopNMetrics {
1719 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1720 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1721 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1722}
1723
1724pub struct LookupExecutorMetrics {
1725 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1726 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1727 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1728}
1729
1730pub struct HashAggMetrics {
1731 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1732 pub agg_total_lookup_count: LabelGuardedIntCounter,
1733 pub agg_cached_entry_count: LabelGuardedIntGauge,
1734 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1735 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1736 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1737 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1738 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1739 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1740}
1741
1742pub struct AggDistinctDedupMetrics {
1743 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1744 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1745 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1746}
1747
1748pub struct TemporalJoinMetrics {
1749 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1750 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1751 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1752}
1753
1754pub struct BackfillMetrics {
1755 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1756 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1757}
1758
1759#[derive(Clone)]
1760pub struct CdcBackfillMetrics {
1761 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1762 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1763}
1764
1765pub struct OverWindowMetrics {
1766 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1767 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1768 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1769 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1770 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1771 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1772 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1773 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1774 pub over_window_compute_count: LabelGuardedIntCounter,
1775 pub over_window_same_output_count: LabelGuardedIntCounter,
1776}