1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33 register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45 pub level: MetricLevel,
46
47 pub executor_row_count: RelabeledGuardedIntCounterVec,
49
50 pub mem_stream_node_output_row_count: CountMap,
54 pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56 actor_execution_time: LabelGuardedGaugeVec,
58 actor_scheduled_duration: LabelGuardedGaugeVec,
59 actor_scheduled_cnt: LabelGuardedIntGaugeVec,
60 actor_fast_poll_duration: LabelGuardedGaugeVec,
61 actor_fast_poll_cnt: LabelGuardedIntGaugeVec,
62 actor_slow_poll_duration: LabelGuardedGaugeVec,
63 actor_slow_poll_cnt: LabelGuardedIntGaugeVec,
64 actor_poll_duration: LabelGuardedGaugeVec,
65 actor_poll_cnt: LabelGuardedIntGaugeVec,
66 actor_idle_duration: LabelGuardedGaugeVec,
67 actor_idle_cnt: LabelGuardedIntGaugeVec,
68
69 pub actor_count: LabelGuardedIntGaugeVec,
71 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
72 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
73 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
74
75 pub source_output_row_count: LabelGuardedIntCounterVec,
77 pub source_split_change_count: LabelGuardedIntCounterVec,
78 pub source_backfill_row_count: LabelGuardedIntCounterVec,
79
80 sink_input_row_count: LabelGuardedIntCounterVec,
82 sink_input_bytes: LabelGuardedIntCounterVec,
83 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
84
85 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
87
88 pub merge_barrier_align_duration: RelabeledGuardedHistogramVec,
91
92 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
94 actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec,
95
96 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
98 pub join_lookup_total_count: LabelGuardedIntCounterVec,
99 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
100 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
101 pub join_match_duration_ns: LabelGuardedIntCounterVec,
102 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
103 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
104
105 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
107
108 agg_lookup_miss_count: LabelGuardedIntCounterVec,
110 agg_total_lookup_count: LabelGuardedIntCounterVec,
111 agg_cached_entry_count: LabelGuardedIntGaugeVec,
112 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
113 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
114 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
115 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
116 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
117 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
118 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
119 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
120 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
121
122 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
124 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
125 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
126 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
128 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
129 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
130
131 lookup_cache_miss_count: LabelGuardedIntCounterVec,
133 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
134 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
135
136 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
138 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
139 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
140
141 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
143 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
144
145 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
147 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
148
149 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
151
152 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
154 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
155 over_window_cache_miss_count: LabelGuardedIntCounterVec,
156 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
157 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
158 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
159 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
160 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
161 over_window_compute_count: LabelGuardedIntCounterVec,
162 over_window_same_output_count: LabelGuardedIntCounterVec,
163
164 pub barrier_inflight_latency: Histogram,
168 pub barrier_sync_latency: Histogram,
170 pub barrier_batch_size: Histogram,
171 pub barrier_manager_progress: IntCounter,
173
174 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
175 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
176 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
177 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
178 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
179 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
180 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
181 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
182 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
183 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
184
185 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196
197 pub lru_runtime_loop_count: IntCounter,
199 pub lru_latest_sequence: IntGauge,
200 pub lru_watermark_sequence: IntGauge,
201 pub lru_eviction_policy: IntGauge,
202 pub jemalloc_allocated_bytes: IntGauge,
203 pub jemalloc_active_bytes: IntGauge,
204 pub jemalloc_resident_bytes: IntGauge,
205 pub jemalloc_metadata_bytes: IntGauge,
206 pub jvm_allocated_bytes: IntGauge,
207 pub jvm_active_bytes: IntGauge,
208 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
209
210 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
212 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
213 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
214 materialize_input_row_count: RelabeledGuardedIntCounterVec,
215 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
216}
217
218pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
219
220pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
221 GLOBAL_STREAMING_METRICS
222 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
223 .clone()
224}
225
226impl StreamingMetrics {
227 fn new(registry: &Registry, level: MetricLevel) -> Self {
228 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
229 "stream_executor_row_count",
230 "Total number of rows that have been output from each executor",
231 &["actor_id", "fragment_id", "executor_identity"],
232 registry
233 )
234 .unwrap()
235 .relabel_debug_1(level);
236
237 let stream_node_output_row_count = CountMap::new();
238 let stream_node_output_blocking_duration_ns = CountMap::new();
239
240 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
241 "stream_source_output_rows_counts",
242 "Total number of rows that have been output from source",
243 &["source_id", "source_name", "actor_id", "fragment_id"],
244 registry
245 )
246 .unwrap();
247
248 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
249 "stream_source_split_change_event_count",
250 "Total number of split change events that have been operated by source",
251 &["source_id", "source_name", "actor_id", "fragment_id"],
252 registry
253 )
254 .unwrap();
255
256 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
257 "stream_source_backfill_rows_counts",
258 "Total number of rows that have been backfilled for source",
259 &["source_id", "source_name", "actor_id", "fragment_id"],
260 registry
261 )
262 .unwrap();
263
264 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
265 "stream_sink_input_row_count",
266 "Total number of rows streamed into sink executors",
267 &["sink_id", "actor_id", "fragment_id"],
268 registry
269 )
270 .unwrap();
271
272 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
273 "stream_sink_input_bytes",
274 "Total size of chunks streamed into sink executors",
275 &["sink_id", "actor_id", "fragment_id"],
276 registry
277 )
278 .unwrap();
279
280 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
281 "stream_mview_input_row_count",
282 "Total number of rows streamed into materialize executors",
283 &["actor_id", "table_id", "fragment_id"],
284 registry
285 )
286 .unwrap()
287 .relabel_debug_1(level);
288
289 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
290 "stream_mview_current_epoch",
291 "The current epoch of the materialized executor",
292 &["actor_id", "table_id", "fragment_id"],
293 registry
294 )
295 .unwrap()
296 .relabel_debug_1(level);
297
298 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
299 "stream_sink_chunk_buffer_size",
300 "Total size of chunks buffered in a barrier",
301 &["sink_id", "actor_id", "fragment_id"],
302 registry
303 )
304 .unwrap();
305
306 let actor_execution_time = register_guarded_gauge_vec_with_registry!(
307 "stream_actor_actor_execution_time",
308 "Total execution time (s) of an actor",
309 &["actor_id"],
310 registry
311 )
312 .unwrap();
313
314 let actor_output_buffer_blocking_duration_ns =
315 register_guarded_int_counter_vec_with_registry!(
316 "stream_actor_output_buffer_blocking_duration_ns",
317 "Total blocking duration (ns) of output buffer",
318 &["actor_id", "fragment_id", "downstream_fragment_id"],
319 registry
320 )
321 .unwrap()
322 .relabel_debug_1(level);
324
325 let actor_input_buffer_blocking_duration_ns =
326 register_guarded_int_counter_vec_with_registry!(
327 "stream_actor_input_buffer_blocking_duration_ns",
328 "Total blocking duration (ns) of input buffer",
329 &["actor_id", "fragment_id", "upstream_fragment_id"],
330 registry
331 )
332 .unwrap();
333
334 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
335 "stream_exchange_frag_recv_size",
336 "Total size of messages that have been received from upstream Fragment",
337 &["up_fragment_id", "down_fragment_id"],
338 registry
339 )
340 .unwrap();
341
342 let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
343 "stream_actor_fast_poll_duration",
344 "tokio's metrics",
345 &["actor_id"],
346 registry
347 )
348 .unwrap();
349
350 let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
351 "stream_actor_fast_poll_cnt",
352 "tokio's metrics",
353 &["actor_id"],
354 registry
355 )
356 .unwrap();
357
358 let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
359 "stream_actor_slow_poll_duration",
360 "tokio's metrics",
361 &["actor_id"],
362 registry
363 )
364 .unwrap();
365
366 let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
367 "stream_actor_slow_poll_cnt",
368 "tokio's metrics",
369 &["actor_id"],
370 registry
371 )
372 .unwrap();
373
374 let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
375 "stream_actor_poll_duration",
376 "tokio's metrics",
377 &["actor_id"],
378 registry
379 )
380 .unwrap();
381
382 let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
383 "stream_actor_poll_cnt",
384 "tokio's metrics",
385 &["actor_id"],
386 registry
387 )
388 .unwrap();
389
390 let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
391 "stream_actor_scheduled_duration",
392 "tokio's metrics",
393 &["actor_id"],
394 registry
395 )
396 .unwrap();
397
398 let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
399 "stream_actor_scheduled_cnt",
400 "tokio's metrics",
401 &["actor_id"],
402 registry
403 )
404 .unwrap();
405
406 let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
407 "stream_actor_idle_duration",
408 "tokio's metrics",
409 &["actor_id"],
410 registry
411 )
412 .unwrap();
413
414 let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
415 "stream_actor_idle_cnt",
416 "tokio's metrics",
417 &["actor_id"],
418 registry
419 )
420 .unwrap();
421
422 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
423 "stream_actor_in_record_cnt",
424 "Total number of rows actor received",
425 &["actor_id", "fragment_id", "upstream_fragment_id"],
426 registry
427 )
428 .unwrap()
429 .relabel_debug_1(level);
430
431 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
432 "stream_actor_out_record_cnt",
433 "Total number of rows actor sent",
434 &["actor_id", "fragment_id"],
435 registry
436 )
437 .unwrap()
438 .relabel_debug_1(level);
439
440 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
441 "stream_actor_current_epoch",
442 "Current epoch of actor",
443 &["actor_id", "fragment_id"],
444 registry
445 )
446 .unwrap()
447 .relabel_debug_1(level);
448
449 let actor_count = register_guarded_int_gauge_vec_with_registry!(
450 "stream_actor_count",
451 "Total number of actors (parallelism)",
452 &["fragment_id"],
453 registry
454 )
455 .unwrap();
456
457 let opts = histogram_opts!(
458 "stream_merge_barrier_align_duration",
459 "Duration of merge align barrier",
460 exponential_buckets(0.0001, 2.0, 21).unwrap() );
462 let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
463 opts,
464 &["actor_id", "fragment_id"],
465 registry
466 )
467 .unwrap()
468 .relabel_debug_1(level);
469
470 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
471 "stream_join_lookup_miss_count",
472 "Join executor lookup miss duration",
473 &["side", "join_table_id", "actor_id", "fragment_id"],
474 registry
475 )
476 .unwrap();
477
478 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
479 "stream_join_lookup_total_count",
480 "Join executor lookup total operation",
481 &["side", "join_table_id", "actor_id", "fragment_id"],
482 registry
483 )
484 .unwrap();
485
486 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
487 "stream_join_insert_cache_miss_count",
488 "Join executor cache miss when insert operation",
489 &["side", "join_table_id", "actor_id", "fragment_id"],
490 registry
491 )
492 .unwrap();
493
494 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
495 "stream_join_actor_input_waiting_duration_ns",
496 "Total waiting duration (ns) of input buffer of join actor",
497 &["actor_id", "fragment_id"],
498 registry
499 )
500 .unwrap();
501
502 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
503 "stream_join_match_duration_ns",
504 "Matching duration for each side",
505 &["actor_id", "fragment_id", "side"],
506 registry
507 )
508 .unwrap();
509
510 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
511 "stream_barrier_align_duration_ns",
512 "Duration of join align barrier",
513 &["actor_id", "fragment_id", "wait_side", "executor"],
514 registry
515 )
516 .unwrap()
517 .relabel_debug_1(level);
518
519 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
520 "stream_join_cached_entry_count",
521 "Number of cached entries in streaming join operators",
522 &["actor_id", "fragment_id", "side"],
523 registry
524 )
525 .unwrap();
526
527 let join_matched_join_keys_opts = histogram_opts!(
528 "stream_join_matched_join_keys",
529 "The number of keys matched in the opposite side",
530 exponential_buckets(16.0, 2.0, 28).unwrap() );
532
533 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
534 join_matched_join_keys_opts,
535 &["actor_id", "fragment_id", "table_id"],
536 registry
537 )
538 .unwrap()
539 .relabel_debug_1(level);
540
541 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
542 "stream_agg_lookup_miss_count",
543 "Aggregation executor lookup miss duration",
544 &["table_id", "actor_id", "fragment_id"],
545 registry
546 )
547 .unwrap();
548
549 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
550 "stream_agg_lookup_total_count",
551 "Aggregation executor lookup total operation",
552 &["table_id", "actor_id", "fragment_id"],
553 registry
554 )
555 .unwrap();
556
557 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
558 "stream_agg_distinct_cache_miss_count",
559 "Aggregation executor dinsinct miss duration",
560 &["table_id", "actor_id", "fragment_id"],
561 registry
562 )
563 .unwrap();
564
565 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
566 "stream_agg_distinct_total_cache_count",
567 "Aggregation executor distinct total operation",
568 &["table_id", "actor_id", "fragment_id"],
569 registry
570 )
571 .unwrap();
572
573 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
574 "stream_agg_distinct_cached_entry_count",
575 "Total entry counts in distinct aggregation executor cache",
576 &["table_id", "actor_id", "fragment_id"],
577 registry
578 )
579 .unwrap();
580
581 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
582 "stream_agg_dirty_groups_count",
583 "Total dirty group counts in aggregation executor",
584 &["table_id", "actor_id", "fragment_id"],
585 registry
586 )
587 .unwrap();
588
589 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
590 "stream_agg_dirty_groups_heap_size",
591 "Total dirty group heap size in aggregation executor",
592 &["table_id", "actor_id", "fragment_id"],
593 registry
594 )
595 .unwrap();
596
597 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
598 "stream_agg_state_cache_lookup_count",
599 "Aggregation executor state cache lookup count",
600 &["table_id", "actor_id", "fragment_id"],
601 registry
602 )
603 .unwrap();
604
605 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
606 "stream_agg_state_cache_miss_count",
607 "Aggregation executor state cache miss count",
608 &["table_id", "actor_id", "fragment_id"],
609 registry
610 )
611 .unwrap();
612
613 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
614 "stream_group_top_n_cache_miss_count",
615 "Group top n executor cache miss count",
616 &["table_id", "actor_id", "fragment_id"],
617 registry
618 )
619 .unwrap();
620
621 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
622 "stream_group_top_n_total_query_cache_count",
623 "Group top n executor query cache total count",
624 &["table_id", "actor_id", "fragment_id"],
625 registry
626 )
627 .unwrap();
628
629 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
630 "stream_group_top_n_cached_entry_count",
631 "Total entry counts in group top n executor cache",
632 &["table_id", "actor_id", "fragment_id"],
633 registry
634 )
635 .unwrap();
636
637 let group_top_n_appendonly_cache_miss_count =
638 register_guarded_int_counter_vec_with_registry!(
639 "stream_group_top_n_appendonly_cache_miss_count",
640 "Group top n appendonly executor cache miss count",
641 &["table_id", "actor_id", "fragment_id"],
642 registry
643 )
644 .unwrap();
645
646 let group_top_n_appendonly_total_query_cache_count =
647 register_guarded_int_counter_vec_with_registry!(
648 "stream_group_top_n_appendonly_total_query_cache_count",
649 "Group top n appendonly executor total cache count",
650 &["table_id", "actor_id", "fragment_id"],
651 registry
652 )
653 .unwrap();
654
655 let group_top_n_appendonly_cached_entry_count =
656 register_guarded_int_gauge_vec_with_registry!(
657 "stream_group_top_n_appendonly_cached_entry_count",
658 "Total entry counts in group top n appendonly executor cache",
659 &["table_id", "actor_id", "fragment_id"],
660 registry
661 )
662 .unwrap();
663
664 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
665 "stream_lookup_cache_miss_count",
666 "Lookup executor cache miss count",
667 &["table_id", "actor_id", "fragment_id"],
668 registry
669 )
670 .unwrap();
671
672 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
673 "stream_lookup_total_query_cache_count",
674 "Lookup executor query cache total count",
675 &["table_id", "actor_id", "fragment_id"],
676 registry
677 )
678 .unwrap();
679
680 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
681 "stream_lookup_cached_entry_count",
682 "Total entry counts in lookup executor cache",
683 &["table_id", "actor_id", "fragment_id"],
684 registry
685 )
686 .unwrap();
687
688 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
689 "stream_temporal_join_cache_miss_count",
690 "Temporal join executor cache miss count",
691 &["table_id", "actor_id", "fragment_id"],
692 registry
693 )
694 .unwrap();
695
696 let temporal_join_total_query_cache_count =
697 register_guarded_int_counter_vec_with_registry!(
698 "stream_temporal_join_total_query_cache_count",
699 "Temporal join executor query cache total count",
700 &["table_id", "actor_id", "fragment_id"],
701 registry
702 )
703 .unwrap();
704
705 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
706 "stream_temporal_join_cached_entry_count",
707 "Total entry count in temporal join executor cache",
708 &["table_id", "actor_id", "fragment_id"],
709 registry
710 )
711 .unwrap();
712
713 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
714 "stream_agg_cached_entry_count",
715 "Number of cached keys in streaming aggregation operators",
716 &["table_id", "actor_id", "fragment_id"],
717 registry
718 )
719 .unwrap();
720
721 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
722 "stream_agg_chunk_lookup_miss_count",
723 "Aggregation executor chunk-level lookup miss duration",
724 &["table_id", "actor_id", "fragment_id"],
725 registry
726 )
727 .unwrap();
728
729 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
730 "stream_agg_chunk_lookup_total_count",
731 "Aggregation executor chunk-level lookup total operation",
732 &["table_id", "actor_id", "fragment_id"],
733 registry
734 )
735 .unwrap();
736
737 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
738 "stream_backfill_snapshot_read_row_count",
739 "Total number of rows that have been read from the backfill snapshot",
740 &["table_id", "actor_id"],
741 registry
742 )
743 .unwrap();
744
745 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
746 "stream_backfill_upstream_output_row_count",
747 "Total number of rows that have been output from the backfill upstream",
748 &["table_id", "actor_id"],
749 registry
750 )
751 .unwrap();
752
753 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
754 "stream_cdc_backfill_snapshot_read_row_count",
755 "Total number of rows that have been read from the cdc_backfill snapshot",
756 &["table_id", "actor_id"],
757 registry
758 )
759 .unwrap();
760
761 let cdc_backfill_upstream_output_row_count =
762 register_guarded_int_counter_vec_with_registry!(
763 "stream_cdc_backfill_upstream_output_row_count",
764 "Total number of rows that have been output from the cdc_backfill upstream",
765 &["table_id", "actor_id"],
766 registry
767 )
768 .unwrap();
769
770 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
771 "stream_snapshot_backfill_consume_snapshot_row_count",
772 "Total number of rows that have been output from snapshot backfill",
773 &["table_id", "actor_id", "stage"],
774 registry
775 )
776 .unwrap();
777
778 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
779 "stream_over_window_cached_entry_count",
780 "Total entry (partition) count in over window executor cache",
781 &["table_id", "actor_id", "fragment_id"],
782 registry
783 )
784 .unwrap();
785
786 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
787 "stream_over_window_cache_lookup_count",
788 "Over window executor cache lookup count",
789 &["table_id", "actor_id", "fragment_id"],
790 registry
791 )
792 .unwrap();
793
794 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
795 "stream_over_window_cache_miss_count",
796 "Over window executor cache miss count",
797 &["table_id", "actor_id", "fragment_id"],
798 registry
799 )
800 .unwrap();
801
802 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
803 "stream_over_window_range_cache_entry_count",
804 "Over window partition range cache entry count",
805 &["table_id", "actor_id", "fragment_id"],
806 registry,
807 )
808 .unwrap();
809
810 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
811 "stream_over_window_range_cache_lookup_count",
812 "Over window partition range cache lookup count",
813 &["table_id", "actor_id", "fragment_id"],
814 registry
815 )
816 .unwrap();
817
818 let over_window_range_cache_left_miss_count =
819 register_guarded_int_counter_vec_with_registry!(
820 "stream_over_window_range_cache_left_miss_count",
821 "Over window partition range cache left miss count",
822 &["table_id", "actor_id", "fragment_id"],
823 registry
824 )
825 .unwrap();
826
827 let over_window_range_cache_right_miss_count =
828 register_guarded_int_counter_vec_with_registry!(
829 "stream_over_window_range_cache_right_miss_count",
830 "Over window partition range cache right miss count",
831 &["table_id", "actor_id", "fragment_id"],
832 registry
833 )
834 .unwrap();
835
836 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
837 "stream_over_window_accessed_entry_count",
838 "Over window accessed entry count",
839 &["table_id", "actor_id", "fragment_id"],
840 registry
841 )
842 .unwrap();
843
844 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
845 "stream_over_window_compute_count",
846 "Over window compute count",
847 &["table_id", "actor_id", "fragment_id"],
848 registry
849 )
850 .unwrap();
851
852 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
853 "stream_over_window_same_output_count",
854 "Over window same output count",
855 &["table_id", "actor_id", "fragment_id"],
856 registry
857 )
858 .unwrap();
859
860 let opts = histogram_opts!(
861 "stream_barrier_inflight_duration_seconds",
862 "barrier_inflight_latency",
863 exponential_buckets(0.1, 1.5, 16).unwrap() );
865 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
866
867 let opts = histogram_opts!(
868 "stream_barrier_sync_storage_duration_seconds",
869 "barrier_sync_latency",
870 exponential_buckets(0.1, 1.5, 16).unwrap() );
872 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
873
874 let opts = histogram_opts!(
875 "stream_barrier_batch_size",
876 "barrier_batch_size",
877 exponential_buckets(1.0, 2.0, 8).unwrap()
878 );
879 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
880
881 let barrier_manager_progress = register_int_counter_with_registry!(
882 "stream_barrier_manager_progress",
883 "The number of actors that have processed the earliest in-flight barriers",
884 registry
885 )
886 .unwrap();
887
888 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
889 "sync_kv_log_store_wait_next_poll_ns",
890 "Total duration (ns) of waiting for next poll",
891 &["actor_id", "target", "fragment_id", "relation"],
892 registry
893 )
894 .unwrap();
895
896 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
897 "sync_kv_log_store_read_count",
898 "read row count throughput of sync_kv log store",
899 &["type", "actor_id", "target", "fragment_id", "relation"],
900 registry
901 )
902 .unwrap();
903
904 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
905 "sync_kv_log_store_read_size",
906 "read size throughput of sync_kv log store",
907 &["type", "actor_id", "target", "fragment_id", "relation"],
908 registry
909 )
910 .unwrap();
911
912 let sync_kv_log_store_write_pause_duration_ns =
913 register_guarded_int_counter_vec_with_registry!(
914 "sync_kv_log_store_write_pause_duration_ns",
915 "Duration (ns) of sync_kv log store write pause",
916 &["actor_id", "target", "fragment_id", "relation"],
917 registry
918 )
919 .unwrap();
920
921 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
922 "sync_kv_log_store_state",
923 "clean/unclean state transition for sync_kv log store",
924 &["state", "actor_id", "target", "fragment_id", "relation"],
925 registry
926 )
927 .unwrap();
928
929 let sync_kv_log_store_storage_write_count =
930 register_guarded_int_counter_vec_with_registry!(
931 "sync_kv_log_store_storage_write_count",
932 "Write row count throughput of sync_kv log store",
933 &["actor_id", "target", "fragment_id", "relation"],
934 registry
935 )
936 .unwrap();
937
938 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
939 "sync_kv_log_store_storage_write_size",
940 "Write size throughput of sync_kv log store",
941 &["actor_id", "target", "fragment_id", "relation"],
942 registry
943 )
944 .unwrap();
945
946 let sync_kv_log_store_buffer_unconsumed_item_count =
947 register_guarded_int_gauge_vec_with_registry!(
948 "sync_kv_log_store_buffer_unconsumed_item_count",
949 "Number of Unconsumed Item in buffer",
950 &["actor_id", "target", "fragment_id", "relation"],
951 registry
952 )
953 .unwrap();
954
955 let sync_kv_log_store_buffer_unconsumed_row_count =
956 register_guarded_int_gauge_vec_with_registry!(
957 "sync_kv_log_store_buffer_unconsumed_row_count",
958 "Number of Unconsumed Row in buffer",
959 &["actor_id", "target", "fragment_id", "relation"],
960 registry
961 )
962 .unwrap();
963
964 let sync_kv_log_store_buffer_unconsumed_epoch_count =
965 register_guarded_int_gauge_vec_with_registry!(
966 "sync_kv_log_store_buffer_unconsumed_epoch_count",
967 "Number of Unconsumed Epoch in buffer",
968 &["actor_id", "target", "fragment_id", "relation"],
969 registry
970 )
971 .unwrap();
972
973 let sync_kv_log_store_buffer_unconsumed_min_epoch =
974 register_guarded_int_gauge_vec_with_registry!(
975 "sync_kv_log_store_buffer_unconsumed_min_epoch",
976 "Number of Unconsumed Epoch in buffer",
977 &["actor_id", "target", "fragment_id", "relation"],
978 registry
979 )
980 .unwrap();
981
982 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
983 "kv_log_store_storage_write_count",
984 "Write row count throughput of kv log store",
985 &["actor_id", "connector", "sink_id", "sink_name"],
986 registry
987 )
988 .unwrap();
989
990 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
991 "kv_log_store_storage_write_size",
992 "Write size throughput of kv log store",
993 &["actor_id", "connector", "sink_id", "sink_name"],
994 registry
995 )
996 .unwrap();
997
998 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
999 "kv_log_store_storage_read_count",
1000 "Write row count throughput of kv log store",
1001 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1002 registry
1003 )
1004 .unwrap();
1005
1006 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1007 "kv_log_store_storage_read_size",
1008 "Write size throughput of kv log store",
1009 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1010 registry
1011 )
1012 .unwrap();
1013
1014 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1015 "kv_log_store_rewind_count",
1016 "Kv log store rewind rate",
1017 &["actor_id", "connector", "sink_id", "sink_name"],
1018 registry
1019 )
1020 .unwrap();
1021
1022 let kv_log_store_rewind_delay_opts = {
1023 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1024 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1025 - REWIND_BASE_DELAY.as_secs_f64().log2())
1026 .ceil() as usize;
1027 let buckets = exponential_buckets(
1028 REWIND_BASE_DELAY.as_secs_f64(),
1029 REWIND_BACKOFF_FACTOR as _,
1030 bucket_count,
1031 )
1032 .unwrap();
1033 histogram_opts!(
1034 "kv_log_store_rewind_delay",
1035 "Kv log store rewind delay",
1036 buckets,
1037 )
1038 };
1039
1040 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1041 kv_log_store_rewind_delay_opts,
1042 &["actor_id", "connector", "sink_id", "sink_name"],
1043 registry
1044 )
1045 .unwrap();
1046
1047 let kv_log_store_buffer_unconsumed_item_count =
1048 register_guarded_int_gauge_vec_with_registry!(
1049 "kv_log_store_buffer_unconsumed_item_count",
1050 "Number of Unconsumed Item in buffer",
1051 &["actor_id", "connector", "sink_id", "sink_name"],
1052 registry
1053 )
1054 .unwrap();
1055
1056 let kv_log_store_buffer_unconsumed_row_count =
1057 register_guarded_int_gauge_vec_with_registry!(
1058 "kv_log_store_buffer_unconsumed_row_count",
1059 "Number of Unconsumed Row in buffer",
1060 &["actor_id", "connector", "sink_id", "sink_name"],
1061 registry
1062 )
1063 .unwrap();
1064
1065 let kv_log_store_buffer_unconsumed_epoch_count =
1066 register_guarded_int_gauge_vec_with_registry!(
1067 "kv_log_store_buffer_unconsumed_epoch_count",
1068 "Number of Unconsumed Epoch in buffer",
1069 &["actor_id", "connector", "sink_id", "sink_name"],
1070 registry
1071 )
1072 .unwrap();
1073
1074 let kv_log_store_buffer_unconsumed_min_epoch =
1075 register_guarded_int_gauge_vec_with_registry!(
1076 "kv_log_store_buffer_unconsumed_min_epoch",
1077 "Number of Unconsumed Epoch in buffer",
1078 &["actor_id", "connector", "sink_id", "sink_name"],
1079 registry
1080 )
1081 .unwrap();
1082
1083 let lru_runtime_loop_count = register_int_counter_with_registry!(
1084 "lru_runtime_loop_count",
1085 "The counts of the eviction loop in LRU manager per second",
1086 registry
1087 )
1088 .unwrap();
1089
1090 let lru_latest_sequence = register_int_gauge_with_registry!(
1091 "lru_latest_sequence",
1092 "Current LRU global sequence",
1093 registry,
1094 )
1095 .unwrap();
1096
1097 let lru_watermark_sequence = register_int_gauge_with_registry!(
1098 "lru_watermark_sequence",
1099 "Current LRU watermark sequence",
1100 registry,
1101 )
1102 .unwrap();
1103
1104 let lru_eviction_policy = register_int_gauge_with_registry!(
1105 "lru_eviction_policy",
1106 "Current LRU eviction policy",
1107 registry,
1108 )
1109 .unwrap();
1110
1111 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1112 "jemalloc_allocated_bytes",
1113 "The allocated memory jemalloc, got from jemalloc_ctl",
1114 registry
1115 )
1116 .unwrap();
1117
1118 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1119 "jemalloc_active_bytes",
1120 "The active memory jemalloc, got from jemalloc_ctl",
1121 registry
1122 )
1123 .unwrap();
1124
1125 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1126 "jemalloc_resident_bytes",
1127 "The active memory jemalloc, got from jemalloc_ctl",
1128 registry
1129 )
1130 .unwrap();
1131
1132 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1133 "jemalloc_metadata_bytes",
1134 "The active memory jemalloc, got from jemalloc_ctl",
1135 registry
1136 )
1137 .unwrap();
1138
1139 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1140 "jvm_allocated_bytes",
1141 "The allocated jvm memory",
1142 registry
1143 )
1144 .unwrap();
1145
1146 let jvm_active_bytes = register_int_gauge_with_registry!(
1147 "jvm_active_bytes",
1148 "The active jvm memory",
1149 registry
1150 )
1151 .unwrap();
1152
1153 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1154 "stream_materialize_cache_hit_count",
1155 "Materialize executor cache hit count",
1156 &["actor_id", "table_id", "fragment_id"],
1157 registry
1158 )
1159 .unwrap()
1160 .relabel_debug_1(level);
1161
1162 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1163 "stream_materialize_data_exist_count",
1164 "Materialize executor data exist count",
1165 &["actor_id", "table_id", "fragment_id"],
1166 registry
1167 )
1168 .unwrap()
1169 .relabel_debug_1(level);
1170
1171 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1172 "stream_materialize_cache_total_count",
1173 "Materialize executor cache total operation",
1174 &["actor_id", "table_id", "fragment_id"],
1175 registry
1176 )
1177 .unwrap()
1178 .relabel_debug_1(level);
1179
1180 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1181 "stream_memory_usage",
1182 "Memory usage for stream executors",
1183 &["actor_id", "table_id", "desc"],
1184 registry
1185 )
1186 .unwrap()
1187 .relabel_debug_1(level);
1188
1189 Self {
1190 level,
1191 executor_row_count,
1192 mem_stream_node_output_row_count: stream_node_output_row_count,
1193 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1194 actor_execution_time,
1195 actor_scheduled_duration,
1196 actor_scheduled_cnt,
1197 actor_fast_poll_duration,
1198 actor_fast_poll_cnt,
1199 actor_slow_poll_duration,
1200 actor_slow_poll_cnt,
1201 actor_poll_duration,
1202 actor_poll_cnt,
1203 actor_idle_duration,
1204 actor_idle_cnt,
1205 actor_count,
1206 actor_in_record_cnt,
1207 actor_out_record_cnt,
1208 actor_current_epoch,
1209 source_output_row_count,
1210 source_split_change_count,
1211 source_backfill_row_count,
1212 sink_input_row_count,
1213 sink_input_bytes,
1214 sink_chunk_buffer_size,
1215 exchange_frag_recv_size,
1216 merge_barrier_align_duration,
1217 actor_output_buffer_blocking_duration_ns,
1218 actor_input_buffer_blocking_duration_ns,
1219 join_lookup_miss_count,
1220 join_lookup_total_count,
1221 join_insert_cache_miss_count,
1222 join_actor_input_waiting_duration_ns,
1223 join_match_duration_ns,
1224 join_cached_entry_count,
1225 join_matched_join_keys,
1226 barrier_align_duration,
1227 agg_lookup_miss_count,
1228 agg_total_lookup_count,
1229 agg_cached_entry_count,
1230 agg_chunk_lookup_miss_count,
1231 agg_chunk_total_lookup_count,
1232 agg_dirty_groups_count,
1233 agg_dirty_groups_heap_size,
1234 agg_distinct_cache_miss_count,
1235 agg_distinct_total_cache_count,
1236 agg_distinct_cached_entry_count,
1237 agg_state_cache_lookup_count,
1238 agg_state_cache_miss_count,
1239 group_top_n_cache_miss_count,
1240 group_top_n_total_query_cache_count,
1241 group_top_n_cached_entry_count,
1242 group_top_n_appendonly_cache_miss_count,
1243 group_top_n_appendonly_total_query_cache_count,
1244 group_top_n_appendonly_cached_entry_count,
1245 lookup_cache_miss_count,
1246 lookup_total_query_cache_count,
1247 lookup_cached_entry_count,
1248 temporal_join_cache_miss_count,
1249 temporal_join_total_query_cache_count,
1250 temporal_join_cached_entry_count,
1251 backfill_snapshot_read_row_count,
1252 backfill_upstream_output_row_count,
1253 cdc_backfill_snapshot_read_row_count,
1254 cdc_backfill_upstream_output_row_count,
1255 snapshot_backfill_consume_row_count,
1256 over_window_cached_entry_count,
1257 over_window_cache_lookup_count,
1258 over_window_cache_miss_count,
1259 over_window_range_cache_entry_count,
1260 over_window_range_cache_lookup_count,
1261 over_window_range_cache_left_miss_count,
1262 over_window_range_cache_right_miss_count,
1263 over_window_accessed_entry_count,
1264 over_window_compute_count,
1265 over_window_same_output_count,
1266 barrier_inflight_latency,
1267 barrier_sync_latency,
1268 barrier_batch_size,
1269 barrier_manager_progress,
1270 kv_log_store_storage_write_count,
1271 kv_log_store_storage_write_size,
1272 kv_log_store_rewind_count,
1273 kv_log_store_rewind_delay,
1274 kv_log_store_storage_read_count,
1275 kv_log_store_storage_read_size,
1276 kv_log_store_buffer_unconsumed_item_count,
1277 kv_log_store_buffer_unconsumed_row_count,
1278 kv_log_store_buffer_unconsumed_epoch_count,
1279 kv_log_store_buffer_unconsumed_min_epoch,
1280 sync_kv_log_store_read_count,
1281 sync_kv_log_store_read_size,
1282 sync_kv_log_store_write_pause_duration_ns,
1283 sync_kv_log_store_state,
1284 sync_kv_log_store_wait_next_poll_ns,
1285 sync_kv_log_store_storage_write_count,
1286 sync_kv_log_store_storage_write_size,
1287 sync_kv_log_store_buffer_unconsumed_item_count,
1288 sync_kv_log_store_buffer_unconsumed_row_count,
1289 sync_kv_log_store_buffer_unconsumed_epoch_count,
1290 sync_kv_log_store_buffer_unconsumed_min_epoch,
1291 lru_runtime_loop_count,
1292 lru_latest_sequence,
1293 lru_watermark_sequence,
1294 lru_eviction_policy,
1295 jemalloc_allocated_bytes,
1296 jemalloc_active_bytes,
1297 jemalloc_resident_bytes,
1298 jemalloc_metadata_bytes,
1299 jvm_allocated_bytes,
1300 jvm_active_bytes,
1301 stream_memory_usage,
1302 materialize_cache_hit_count,
1303 materialize_data_exist_count,
1304 materialize_cache_total_count,
1305 materialize_input_row_count,
1306 materialize_current_epoch,
1307 }
1308 }
1309
1310 pub fn unused() -> Self {
1312 global_streaming_metrics(MetricLevel::Disabled)
1313 }
1314
1315 pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1316 let label_list: &[&str; 1] = &[&actor_id.to_string()];
1317 let actor_execution_time = self
1318 .actor_execution_time
1319 .with_guarded_label_values(label_list);
1320 let actor_scheduled_duration = self
1321 .actor_scheduled_duration
1322 .with_guarded_label_values(label_list);
1323 let actor_scheduled_cnt = self
1324 .actor_scheduled_cnt
1325 .with_guarded_label_values(label_list);
1326 let actor_fast_poll_duration = self
1327 .actor_fast_poll_duration
1328 .with_guarded_label_values(label_list);
1329 let actor_fast_poll_cnt = self
1330 .actor_fast_poll_cnt
1331 .with_guarded_label_values(label_list);
1332 let actor_slow_poll_duration = self
1333 .actor_slow_poll_duration
1334 .with_guarded_label_values(label_list);
1335 let actor_slow_poll_cnt = self
1336 .actor_slow_poll_cnt
1337 .with_guarded_label_values(label_list);
1338 let actor_poll_duration = self
1339 .actor_poll_duration
1340 .with_guarded_label_values(label_list);
1341 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1342 let actor_idle_duration = self
1343 .actor_idle_duration
1344 .with_guarded_label_values(label_list);
1345 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1346 ActorMetrics {
1347 actor_execution_time,
1348 actor_scheduled_duration,
1349 actor_scheduled_cnt,
1350 actor_fast_poll_duration,
1351 actor_fast_poll_cnt,
1352 actor_slow_poll_duration,
1353 actor_slow_poll_cnt,
1354 actor_poll_duration,
1355 actor_poll_cnt,
1356 actor_idle_duration,
1357 actor_idle_cnt,
1358 }
1359 }
1360
1361 pub(crate) fn new_actor_input_metrics(
1362 &self,
1363 actor_id: ActorId,
1364 fragment_id: FragmentId,
1365 upstream_fragment_id: FragmentId,
1366 ) -> ActorInputMetrics {
1367 let actor_id_str = actor_id.to_string();
1368 let fragment_id_str = fragment_id.to_string();
1369 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1370 ActorInputMetrics {
1371 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1372 &actor_id_str,
1373 &fragment_id_str,
1374 &upstream_fragment_id_str,
1375 ]),
1376 actor_input_buffer_blocking_duration_ns: self
1377 .actor_input_buffer_blocking_duration_ns
1378 .with_guarded_label_values(&[
1379 &actor_id_str,
1380 &fragment_id_str,
1381 &upstream_fragment_id_str,
1382 ]),
1383 }
1384 }
1385
1386 pub fn new_sink_exec_metrics(
1387 &self,
1388 id: SinkId,
1389 actor_id: ActorId,
1390 fragment_id: FragmentId,
1391 ) -> SinkExecutorMetrics {
1392 let label_list: &[&str; 3] = &[
1393 &id.to_string(),
1394 &actor_id.to_string(),
1395 &fragment_id.to_string(),
1396 ];
1397 SinkExecutorMetrics {
1398 sink_input_row_count: self
1399 .sink_input_row_count
1400 .with_guarded_label_values(label_list),
1401 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1402 sink_chunk_buffer_size: self
1403 .sink_chunk_buffer_size
1404 .with_guarded_label_values(label_list),
1405 }
1406 }
1407
1408 pub fn new_group_top_n_metrics(
1409 &self,
1410 table_id: u32,
1411 actor_id: ActorId,
1412 fragment_id: FragmentId,
1413 ) -> GroupTopNMetrics {
1414 let label_list: &[&str; 3] = &[
1415 &table_id.to_string(),
1416 &actor_id.to_string(),
1417 &fragment_id.to_string(),
1418 ];
1419
1420 GroupTopNMetrics {
1421 group_top_n_cache_miss_count: self
1422 .group_top_n_cache_miss_count
1423 .with_guarded_label_values(label_list),
1424 group_top_n_total_query_cache_count: self
1425 .group_top_n_total_query_cache_count
1426 .with_guarded_label_values(label_list),
1427 group_top_n_cached_entry_count: self
1428 .group_top_n_cached_entry_count
1429 .with_guarded_label_values(label_list),
1430 }
1431 }
1432
1433 pub fn new_append_only_group_top_n_metrics(
1434 &self,
1435 table_id: u32,
1436 actor_id: ActorId,
1437 fragment_id: FragmentId,
1438 ) -> GroupTopNMetrics {
1439 let label_list: &[&str; 3] = &[
1440 &table_id.to_string(),
1441 &actor_id.to_string(),
1442 &fragment_id.to_string(),
1443 ];
1444
1445 GroupTopNMetrics {
1446 group_top_n_cache_miss_count: self
1447 .group_top_n_appendonly_cache_miss_count
1448 .with_guarded_label_values(label_list),
1449 group_top_n_total_query_cache_count: self
1450 .group_top_n_appendonly_total_query_cache_count
1451 .with_guarded_label_values(label_list),
1452 group_top_n_cached_entry_count: self
1453 .group_top_n_appendonly_cached_entry_count
1454 .with_guarded_label_values(label_list),
1455 }
1456 }
1457
1458 pub fn new_lookup_executor_metrics(
1459 &self,
1460 table_id: TableId,
1461 actor_id: ActorId,
1462 fragment_id: FragmentId,
1463 ) -> LookupExecutorMetrics {
1464 let label_list: &[&str; 3] = &[
1465 &table_id.to_string(),
1466 &actor_id.to_string(),
1467 &fragment_id.to_string(),
1468 ];
1469
1470 LookupExecutorMetrics {
1471 lookup_cache_miss_count: self
1472 .lookup_cache_miss_count
1473 .with_guarded_label_values(label_list),
1474 lookup_total_query_cache_count: self
1475 .lookup_total_query_cache_count
1476 .with_guarded_label_values(label_list),
1477 lookup_cached_entry_count: self
1478 .lookup_cached_entry_count
1479 .with_guarded_label_values(label_list),
1480 }
1481 }
1482
1483 pub fn new_hash_agg_metrics(
1484 &self,
1485 table_id: u32,
1486 actor_id: ActorId,
1487 fragment_id: FragmentId,
1488 ) -> HashAggMetrics {
1489 let label_list: &[&str; 3] = &[
1490 &table_id.to_string(),
1491 &actor_id.to_string(),
1492 &fragment_id.to_string(),
1493 ];
1494 HashAggMetrics {
1495 agg_lookup_miss_count: self
1496 .agg_lookup_miss_count
1497 .with_guarded_label_values(label_list),
1498 agg_total_lookup_count: self
1499 .agg_total_lookup_count
1500 .with_guarded_label_values(label_list),
1501 agg_cached_entry_count: self
1502 .agg_cached_entry_count
1503 .with_guarded_label_values(label_list),
1504 agg_chunk_lookup_miss_count: self
1505 .agg_chunk_lookup_miss_count
1506 .with_guarded_label_values(label_list),
1507 agg_chunk_total_lookup_count: self
1508 .agg_chunk_total_lookup_count
1509 .with_guarded_label_values(label_list),
1510 agg_dirty_groups_count: self
1511 .agg_dirty_groups_count
1512 .with_guarded_label_values(label_list),
1513 agg_dirty_groups_heap_size: self
1514 .agg_dirty_groups_heap_size
1515 .with_guarded_label_values(label_list),
1516 agg_state_cache_lookup_count: self
1517 .agg_state_cache_lookup_count
1518 .with_guarded_label_values(label_list),
1519 agg_state_cache_miss_count: self
1520 .agg_state_cache_miss_count
1521 .with_guarded_label_values(label_list),
1522 }
1523 }
1524
1525 pub fn new_agg_distinct_dedup_metrics(
1526 &self,
1527 table_id: u32,
1528 actor_id: ActorId,
1529 fragment_id: FragmentId,
1530 ) -> AggDistinctDedupMetrics {
1531 let label_list: &[&str; 3] = &[
1532 &table_id.to_string(),
1533 &actor_id.to_string(),
1534 &fragment_id.to_string(),
1535 ];
1536 AggDistinctDedupMetrics {
1537 agg_distinct_cache_miss_count: self
1538 .agg_distinct_cache_miss_count
1539 .with_guarded_label_values(label_list),
1540 agg_distinct_total_cache_count: self
1541 .agg_distinct_total_cache_count
1542 .with_guarded_label_values(label_list),
1543 agg_distinct_cached_entry_count: self
1544 .agg_distinct_cached_entry_count
1545 .with_guarded_label_values(label_list),
1546 }
1547 }
1548
1549 pub fn new_temporal_join_metrics(
1550 &self,
1551 table_id: TableId,
1552 actor_id: ActorId,
1553 fragment_id: FragmentId,
1554 ) -> TemporalJoinMetrics {
1555 let label_list: &[&str; 3] = &[
1556 &table_id.to_string(),
1557 &actor_id.to_string(),
1558 &fragment_id.to_string(),
1559 ];
1560 TemporalJoinMetrics {
1561 temporal_join_cache_miss_count: self
1562 .temporal_join_cache_miss_count
1563 .with_guarded_label_values(label_list),
1564 temporal_join_total_query_cache_count: self
1565 .temporal_join_total_query_cache_count
1566 .with_guarded_label_values(label_list),
1567 temporal_join_cached_entry_count: self
1568 .temporal_join_cached_entry_count
1569 .with_guarded_label_values(label_list),
1570 }
1571 }
1572
1573 pub fn new_backfill_metrics(&self, table_id: u32, actor_id: ActorId) -> BackfillMetrics {
1574 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1575 BackfillMetrics {
1576 backfill_snapshot_read_row_count: self
1577 .backfill_snapshot_read_row_count
1578 .with_guarded_label_values(label_list),
1579 backfill_upstream_output_row_count: self
1580 .backfill_upstream_output_row_count
1581 .with_guarded_label_values(label_list),
1582 }
1583 }
1584
1585 pub fn new_cdc_backfill_metrics(
1586 &self,
1587 table_id: TableId,
1588 actor_id: ActorId,
1589 ) -> CdcBackfillMetrics {
1590 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1591 CdcBackfillMetrics {
1592 cdc_backfill_snapshot_read_row_count: self
1593 .cdc_backfill_snapshot_read_row_count
1594 .with_guarded_label_values(label_list),
1595 cdc_backfill_upstream_output_row_count: self
1596 .cdc_backfill_upstream_output_row_count
1597 .with_guarded_label_values(label_list),
1598 }
1599 }
1600
1601 pub fn new_over_window_metrics(
1602 &self,
1603 table_id: u32,
1604 actor_id: ActorId,
1605 fragment_id: FragmentId,
1606 ) -> OverWindowMetrics {
1607 let label_list: &[&str; 3] = &[
1608 &table_id.to_string(),
1609 &actor_id.to_string(),
1610 &fragment_id.to_string(),
1611 ];
1612 OverWindowMetrics {
1613 over_window_cached_entry_count: self
1614 .over_window_cached_entry_count
1615 .with_guarded_label_values(label_list),
1616 over_window_cache_lookup_count: self
1617 .over_window_cache_lookup_count
1618 .with_guarded_label_values(label_list),
1619 over_window_cache_miss_count: self
1620 .over_window_cache_miss_count
1621 .with_guarded_label_values(label_list),
1622 over_window_range_cache_entry_count: self
1623 .over_window_range_cache_entry_count
1624 .with_guarded_label_values(label_list),
1625 over_window_range_cache_lookup_count: self
1626 .over_window_range_cache_lookup_count
1627 .with_guarded_label_values(label_list),
1628 over_window_range_cache_left_miss_count: self
1629 .over_window_range_cache_left_miss_count
1630 .with_guarded_label_values(label_list),
1631 over_window_range_cache_right_miss_count: self
1632 .over_window_range_cache_right_miss_count
1633 .with_guarded_label_values(label_list),
1634 over_window_accessed_entry_count: self
1635 .over_window_accessed_entry_count
1636 .with_guarded_label_values(label_list),
1637 over_window_compute_count: self
1638 .over_window_compute_count
1639 .with_guarded_label_values(label_list),
1640 over_window_same_output_count: self
1641 .over_window_same_output_count
1642 .with_guarded_label_values(label_list),
1643 }
1644 }
1645
1646 pub fn new_materialize_metrics(
1647 &self,
1648 table_id: TableId,
1649 actor_id: ActorId,
1650 fragment_id: FragmentId,
1651 ) -> MaterializeMetrics {
1652 let label_list: &[&str; 3] = &[
1653 &actor_id.to_string(),
1654 &table_id.to_string(),
1655 &fragment_id.to_string(),
1656 ];
1657 MaterializeMetrics {
1658 materialize_cache_hit_count: self
1659 .materialize_cache_hit_count
1660 .with_guarded_label_values(label_list),
1661 materialize_data_exist_count: self
1662 .materialize_data_exist_count
1663 .with_guarded_label_values(label_list),
1664 materialize_cache_total_count: self
1665 .materialize_cache_total_count
1666 .with_guarded_label_values(label_list),
1667 materialize_input_row_count: self
1668 .materialize_input_row_count
1669 .with_guarded_label_values(label_list),
1670 materialize_current_epoch: self
1671 .materialize_current_epoch
1672 .with_guarded_label_values(label_list),
1673 }
1674 }
1675}
1676
1677pub(crate) struct ActorInputMetrics {
1678 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1679 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1680}
1681
1682pub struct ActorMetrics {
1684 pub actor_execution_time: LabelGuardedGauge,
1685 pub actor_scheduled_duration: LabelGuardedGauge,
1686 pub actor_scheduled_cnt: LabelGuardedIntGauge,
1687 pub actor_fast_poll_duration: LabelGuardedGauge,
1688 pub actor_fast_poll_cnt: LabelGuardedIntGauge,
1689 pub actor_slow_poll_duration: LabelGuardedGauge,
1690 pub actor_slow_poll_cnt: LabelGuardedIntGauge,
1691 pub actor_poll_duration: LabelGuardedGauge,
1692 pub actor_poll_cnt: LabelGuardedIntGauge,
1693 pub actor_idle_duration: LabelGuardedGauge,
1694 pub actor_idle_cnt: LabelGuardedIntGauge,
1695}
1696
1697pub struct SinkExecutorMetrics {
1698 pub sink_input_row_count: LabelGuardedIntCounter,
1699 pub sink_input_bytes: LabelGuardedIntCounter,
1700 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1701}
1702
1703pub struct MaterializeMetrics {
1704 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1705 pub materialize_data_exist_count: LabelGuardedIntCounter,
1706 pub materialize_cache_total_count: LabelGuardedIntCounter,
1707 pub materialize_input_row_count: LabelGuardedIntCounter,
1708 pub materialize_current_epoch: LabelGuardedIntGauge,
1709}
1710
1711pub struct GroupTopNMetrics {
1712 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1713 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1714 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1715}
1716
1717pub struct LookupExecutorMetrics {
1718 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1719 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1720 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1721}
1722
1723pub struct HashAggMetrics {
1724 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1725 pub agg_total_lookup_count: LabelGuardedIntCounter,
1726 pub agg_cached_entry_count: LabelGuardedIntGauge,
1727 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1728 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1729 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1730 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1731 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1732 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1733}
1734
1735pub struct AggDistinctDedupMetrics {
1736 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1737 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1738 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1739}
1740
1741pub struct TemporalJoinMetrics {
1742 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1743 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1744 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1745}
1746
1747pub struct BackfillMetrics {
1748 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1749 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1750}
1751
1752pub struct CdcBackfillMetrics {
1753 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1754 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1755}
1756
1757pub struct OverWindowMetrics {
1758 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1759 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1760 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1761 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1762 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1763 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1764 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1765 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1766 pub over_window_compute_count: LabelGuardedIntCounter,
1767 pub over_window_same_output_count: LabelGuardedIntCounter,
1768}