1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
26 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry,
33 register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36
37use crate::common::log_store_impl::kv_log_store::{
38 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
39};
40use crate::executor::prelude::ActorId;
41use crate::task::FragmentId;
42
43#[derive(Clone)]
44pub struct StreamingMetrics {
45 pub level: MetricLevel,
46
47 pub executor_row_count: RelabeledGuardedIntCounterVec<3>,
49
50 pub mem_stream_node_output_row_count: CountMap,
54 pub mem_stream_node_output_blocking_duration_ns: CountMap,
55
56 actor_execution_time: LabelGuardedGaugeVec<1>,
58 actor_scheduled_duration: LabelGuardedGaugeVec<1>,
59 actor_scheduled_cnt: LabelGuardedIntGaugeVec<1>,
60 actor_fast_poll_duration: LabelGuardedGaugeVec<1>,
61 actor_fast_poll_cnt: LabelGuardedIntGaugeVec<1>,
62 actor_slow_poll_duration: LabelGuardedGaugeVec<1>,
63 actor_slow_poll_cnt: LabelGuardedIntGaugeVec<1>,
64 actor_poll_duration: LabelGuardedGaugeVec<1>,
65 actor_poll_cnt: LabelGuardedIntGaugeVec<1>,
66 actor_idle_duration: LabelGuardedGaugeVec<1>,
67 actor_idle_cnt: LabelGuardedIntGaugeVec<1>,
68
69 pub actor_count: LabelGuardedIntGaugeVec<1>,
71 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec<3>,
72 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec<2>,
73 pub actor_current_epoch: RelabeledGuardedIntGaugeVec<2>,
74
75 pub source_output_row_count: LabelGuardedIntCounterVec<4>,
77 pub source_split_change_count: LabelGuardedIntCounterVec<4>,
78 pub source_backfill_row_count: LabelGuardedIntCounterVec<4>,
79
80 sink_input_row_count: LabelGuardedIntCounterVec<3>,
82 sink_input_bytes: LabelGuardedIntCounterVec<3>,
83 sink_chunk_buffer_size: LabelGuardedIntGaugeVec<3>,
84
85 pub exchange_frag_recv_size: LabelGuardedIntCounterVec<2>,
87
88 pub merge_barrier_align_duration: RelabeledGuardedHistogramVec<2>,
91
92 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec<3>,
94 actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,
95
96 pub join_lookup_miss_count: LabelGuardedIntCounterVec<4>,
98 pub join_lookup_total_count: LabelGuardedIntCounterVec<4>,
99 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec<4>,
100 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec<2>,
101 pub join_match_duration_ns: LabelGuardedIntCounterVec<3>,
102 pub join_cached_entry_count: LabelGuardedIntGaugeVec<3>,
103 pub join_matched_join_keys: RelabeledGuardedHistogramVec<3>,
104
105 pub barrier_align_duration: RelabeledGuardedIntCounterVec<4>,
107
108 agg_lookup_miss_count: LabelGuardedIntCounterVec<3>,
110 agg_total_lookup_count: LabelGuardedIntCounterVec<3>,
111 agg_cached_entry_count: LabelGuardedIntGaugeVec<3>,
112 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec<3>,
113 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec<3>,
114 agg_dirty_groups_count: LabelGuardedIntGaugeVec<3>,
115 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec<3>,
116 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec<3>,
117 agg_distinct_total_cache_count: LabelGuardedIntCounterVec<3>,
118 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec<3>,
119 agg_state_cache_lookup_count: LabelGuardedIntCounterVec<3>,
120 agg_state_cache_miss_count: LabelGuardedIntCounterVec<3>,
121
122 group_top_n_cache_miss_count: LabelGuardedIntCounterVec<3>,
124 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec<3>,
125 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec<3>,
126 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec<3>,
128 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec<3>,
129 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec<3>,
130
131 lookup_cache_miss_count: LabelGuardedIntCounterVec<3>,
133 lookup_total_query_cache_count: LabelGuardedIntCounterVec<3>,
134 lookup_cached_entry_count: LabelGuardedIntGaugeVec<3>,
135
136 temporal_join_cache_miss_count: LabelGuardedIntCounterVec<3>,
138 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec<3>,
139 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec<3>,
140
141 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec<2>,
143 backfill_upstream_output_row_count: LabelGuardedIntCounterVec<2>,
144
145 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec<2>,
147 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec<2>,
148
149 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec<3>,
151
152 over_window_cached_entry_count: LabelGuardedIntGaugeVec<3>,
154 over_window_cache_lookup_count: LabelGuardedIntCounterVec<3>,
155 over_window_cache_miss_count: LabelGuardedIntCounterVec<3>,
156 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec<3>,
157 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec<3>,
158 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec<3>,
159 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec<3>,
160 over_window_accessed_entry_count: LabelGuardedIntCounterVec<3>,
161 over_window_compute_count: LabelGuardedIntCounterVec<3>,
162 over_window_same_output_count: LabelGuardedIntCounterVec<3>,
163
164 pub barrier_inflight_latency: Histogram,
168 pub barrier_sync_latency: Histogram,
170 pub barrier_batch_size: Histogram,
171 pub barrier_manager_progress: IntCounter,
173
174 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec<4>,
175 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec<4>,
176 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec<4>,
177 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec<4>,
178 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec<5>,
179 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec<5>,
180 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec<4>,
181 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec<4>,
182 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec<4>,
183 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec<4>,
184
185 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec<5>,
186 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec<5>,
187 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec<4>,
188 pub sync_kv_log_store_state: LabelGuardedIntCounterVec<5>,
189 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec<4>,
190 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec<4>,
191 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec<4>,
192 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec<4>,
193 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec<4>,
194 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec<4>,
195 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec<4>,
196
197 pub lru_runtime_loop_count: IntCounter,
199 pub lru_latest_sequence: IntGauge,
200 pub lru_watermark_sequence: IntGauge,
201 pub lru_eviction_policy: IntGauge,
202 pub jemalloc_allocated_bytes: IntGauge,
203 pub jemalloc_active_bytes: IntGauge,
204 pub jemalloc_resident_bytes: IntGauge,
205 pub jemalloc_metadata_bytes: IntGauge,
206 pub jvm_allocated_bytes: IntGauge,
207 pub jvm_active_bytes: IntGauge,
208 pub stream_memory_usage: RelabeledGuardedIntGaugeVec<3>,
209
210 materialize_cache_hit_count: RelabeledGuardedIntCounterVec<3>,
212 materialize_cache_total_count: RelabeledGuardedIntCounterVec<3>,
213 materialize_input_row_count: RelabeledGuardedIntCounterVec<3>,
214 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec<3>,
215}
216
217pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
218
219pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
220 GLOBAL_STREAMING_METRICS
221 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
222 .clone()
223}
224
225impl StreamingMetrics {
226 fn new(registry: &Registry, level: MetricLevel) -> Self {
227 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
228 "stream_executor_row_count",
229 "Total number of rows that have been output from each executor",
230 &["actor_id", "fragment_id", "executor_identity"],
231 registry
232 )
233 .unwrap()
234 .relabel_debug_1(level);
235
236 let stream_node_output_row_count = CountMap::new();
237 let stream_node_output_blocking_duration_ns = CountMap::new();
238
239 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
240 "stream_source_output_rows_counts",
241 "Total number of rows that have been output from source",
242 &["source_id", "source_name", "actor_id", "fragment_id"],
243 registry
244 )
245 .unwrap();
246
247 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
248 "stream_source_split_change_event_count",
249 "Total number of split change events that have been operated by source",
250 &["source_id", "source_name", "actor_id", "fragment_id"],
251 registry
252 )
253 .unwrap();
254
255 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
256 "stream_source_backfill_rows_counts",
257 "Total number of rows that have been backfilled for source",
258 &["source_id", "source_name", "actor_id", "fragment_id"],
259 registry
260 )
261 .unwrap();
262
263 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
264 "stream_sink_input_row_count",
265 "Total number of rows streamed into sink executors",
266 &["sink_id", "actor_id", "fragment_id"],
267 registry
268 )
269 .unwrap();
270
271 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
272 "stream_sink_input_bytes",
273 "Total size of chunks streamed into sink executors",
274 &["sink_id", "actor_id", "fragment_id"],
275 registry
276 )
277 .unwrap();
278
279 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
280 "stream_mview_input_row_count",
281 "Total number of rows streamed into materialize executors",
282 &["actor_id", "table_id", "fragment_id"],
283 registry
284 )
285 .unwrap()
286 .relabel_debug_1(level);
287
288 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
289 "stream_mview_current_epoch",
290 "The current epoch of the materialized executor",
291 &["actor_id", "table_id", "fragment_id"],
292 registry
293 )
294 .unwrap()
295 .relabel_debug_1(level);
296
297 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
298 "stream_sink_chunk_buffer_size",
299 "Total size of chunks buffered in a barrier",
300 &["sink_id", "actor_id", "fragment_id"],
301 registry
302 )
303 .unwrap();
304
305 let actor_execution_time = register_guarded_gauge_vec_with_registry!(
306 "stream_actor_actor_execution_time",
307 "Total execution time (s) of an actor",
308 &["actor_id"],
309 registry
310 )
311 .unwrap();
312
313 let actor_output_buffer_blocking_duration_ns =
314 register_guarded_int_counter_vec_with_registry!(
315 "stream_actor_output_buffer_blocking_duration_ns",
316 "Total blocking duration (ns) of output buffer",
317 &["actor_id", "fragment_id", "downstream_fragment_id"],
318 registry
319 )
320 .unwrap()
321 .relabel_debug_1(level);
323
324 let actor_input_buffer_blocking_duration_ns =
325 register_guarded_int_counter_vec_with_registry!(
326 "stream_actor_input_buffer_blocking_duration_ns",
327 "Total blocking duration (ns) of input buffer",
328 &["actor_id", "fragment_id", "upstream_fragment_id"],
329 registry
330 )
331 .unwrap();
332
333 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
334 "stream_exchange_frag_recv_size",
335 "Total size of messages that have been received from upstream Fragment",
336 &["up_fragment_id", "down_fragment_id"],
337 registry
338 )
339 .unwrap();
340
341 let actor_fast_poll_duration = register_guarded_gauge_vec_with_registry!(
342 "stream_actor_fast_poll_duration",
343 "tokio's metrics",
344 &["actor_id"],
345 registry
346 )
347 .unwrap();
348
349 let actor_fast_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
350 "stream_actor_fast_poll_cnt",
351 "tokio's metrics",
352 &["actor_id"],
353 registry
354 )
355 .unwrap();
356
357 let actor_slow_poll_duration = register_guarded_gauge_vec_with_registry!(
358 "stream_actor_slow_poll_duration",
359 "tokio's metrics",
360 &["actor_id"],
361 registry
362 )
363 .unwrap();
364
365 let actor_slow_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
366 "stream_actor_slow_poll_cnt",
367 "tokio's metrics",
368 &["actor_id"],
369 registry
370 )
371 .unwrap();
372
373 let actor_poll_duration = register_guarded_gauge_vec_with_registry!(
374 "stream_actor_poll_duration",
375 "tokio's metrics",
376 &["actor_id"],
377 registry
378 )
379 .unwrap();
380
381 let actor_poll_cnt = register_guarded_int_gauge_vec_with_registry!(
382 "stream_actor_poll_cnt",
383 "tokio's metrics",
384 &["actor_id"],
385 registry
386 )
387 .unwrap();
388
389 let actor_scheduled_duration = register_guarded_gauge_vec_with_registry!(
390 "stream_actor_scheduled_duration",
391 "tokio's metrics",
392 &["actor_id"],
393 registry
394 )
395 .unwrap();
396
397 let actor_scheduled_cnt = register_guarded_int_gauge_vec_with_registry!(
398 "stream_actor_scheduled_cnt",
399 "tokio's metrics",
400 &["actor_id"],
401 registry
402 )
403 .unwrap();
404
405 let actor_idle_duration = register_guarded_gauge_vec_with_registry!(
406 "stream_actor_idle_duration",
407 "tokio's metrics",
408 &["actor_id"],
409 registry
410 )
411 .unwrap();
412
413 let actor_idle_cnt = register_guarded_int_gauge_vec_with_registry!(
414 "stream_actor_idle_cnt",
415 "tokio's metrics",
416 &["actor_id"],
417 registry
418 )
419 .unwrap();
420
421 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
422 "stream_actor_in_record_cnt",
423 "Total number of rows actor received",
424 &["actor_id", "fragment_id", "upstream_fragment_id"],
425 registry
426 )
427 .unwrap()
428 .relabel_debug_1(level);
429
430 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
431 "stream_actor_out_record_cnt",
432 "Total number of rows actor sent",
433 &["actor_id", "fragment_id"],
434 registry
435 )
436 .unwrap()
437 .relabel_debug_1(level);
438
439 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
440 "stream_actor_current_epoch",
441 "Current epoch of actor",
442 &["actor_id", "fragment_id"],
443 registry
444 )
445 .unwrap()
446 .relabel_debug_1(level);
447
448 let actor_count = register_guarded_int_gauge_vec_with_registry!(
449 "stream_actor_count",
450 "Total number of actors (parallelism)",
451 &["fragment_id"],
452 registry
453 )
454 .unwrap();
455
456 let opts = histogram_opts!(
457 "stream_merge_barrier_align_duration",
458 "Duration of merge align barrier",
459 exponential_buckets(0.0001, 2.0, 21).unwrap() );
461 let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
462 opts,
463 &["actor_id", "fragment_id"],
464 registry
465 )
466 .unwrap()
467 .relabel_debug_1(level);
468
469 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
470 "stream_join_lookup_miss_count",
471 "Join executor lookup miss duration",
472 &["side", "join_table_id", "actor_id", "fragment_id"],
473 registry
474 )
475 .unwrap();
476
477 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
478 "stream_join_lookup_total_count",
479 "Join executor lookup total operation",
480 &["side", "join_table_id", "actor_id", "fragment_id"],
481 registry
482 )
483 .unwrap();
484
485 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
486 "stream_join_insert_cache_miss_count",
487 "Join executor cache miss when insert operation",
488 &["side", "join_table_id", "actor_id", "fragment_id"],
489 registry
490 )
491 .unwrap();
492
493 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
494 "stream_join_actor_input_waiting_duration_ns",
495 "Total waiting duration (ns) of input buffer of join actor",
496 &["actor_id", "fragment_id"],
497 registry
498 )
499 .unwrap();
500
501 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
502 "stream_join_match_duration_ns",
503 "Matching duration for each side",
504 &["actor_id", "fragment_id", "side"],
505 registry
506 )
507 .unwrap();
508
509 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
510 "stream_barrier_align_duration_ns",
511 "Duration of join align barrier",
512 &["actor_id", "fragment_id", "wait_side", "executor"],
513 registry
514 )
515 .unwrap()
516 .relabel_debug_1(level);
517
518 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
519 "stream_join_cached_entry_count",
520 "Number of cached entries in streaming join operators",
521 &["actor_id", "fragment_id", "side"],
522 registry
523 )
524 .unwrap();
525
526 let join_matched_join_keys_opts = histogram_opts!(
527 "stream_join_matched_join_keys",
528 "The number of keys matched in the opposite side",
529 exponential_buckets(16.0, 2.0, 28).unwrap() );
531
532 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
533 join_matched_join_keys_opts,
534 &["actor_id", "fragment_id", "table_id"],
535 registry
536 )
537 .unwrap()
538 .relabel_debug_1(level);
539
540 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
541 "stream_agg_lookup_miss_count",
542 "Aggregation executor lookup miss duration",
543 &["table_id", "actor_id", "fragment_id"],
544 registry
545 )
546 .unwrap();
547
548 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
549 "stream_agg_lookup_total_count",
550 "Aggregation executor lookup total operation",
551 &["table_id", "actor_id", "fragment_id"],
552 registry
553 )
554 .unwrap();
555
556 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
557 "stream_agg_distinct_cache_miss_count",
558 "Aggregation executor dinsinct miss duration",
559 &["table_id", "actor_id", "fragment_id"],
560 registry
561 )
562 .unwrap();
563
564 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
565 "stream_agg_distinct_total_cache_count",
566 "Aggregation executor distinct total operation",
567 &["table_id", "actor_id", "fragment_id"],
568 registry
569 )
570 .unwrap();
571
572 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
573 "stream_agg_distinct_cached_entry_count",
574 "Total entry counts in distinct aggregation executor cache",
575 &["table_id", "actor_id", "fragment_id"],
576 registry
577 )
578 .unwrap();
579
580 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
581 "stream_agg_dirty_groups_count",
582 "Total dirty group counts in aggregation executor",
583 &["table_id", "actor_id", "fragment_id"],
584 registry
585 )
586 .unwrap();
587
588 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
589 "stream_agg_dirty_groups_heap_size",
590 "Total dirty group heap size in aggregation executor",
591 &["table_id", "actor_id", "fragment_id"],
592 registry
593 )
594 .unwrap();
595
596 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
597 "stream_agg_state_cache_lookup_count",
598 "Aggregation executor state cache lookup count",
599 &["table_id", "actor_id", "fragment_id"],
600 registry
601 )
602 .unwrap();
603
604 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
605 "stream_agg_state_cache_miss_count",
606 "Aggregation executor state cache miss count",
607 &["table_id", "actor_id", "fragment_id"],
608 registry
609 )
610 .unwrap();
611
612 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
613 "stream_group_top_n_cache_miss_count",
614 "Group top n executor cache miss count",
615 &["table_id", "actor_id", "fragment_id"],
616 registry
617 )
618 .unwrap();
619
620 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
621 "stream_group_top_n_total_query_cache_count",
622 "Group top n executor query cache total count",
623 &["table_id", "actor_id", "fragment_id"],
624 registry
625 )
626 .unwrap();
627
628 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
629 "stream_group_top_n_cached_entry_count",
630 "Total entry counts in group top n executor cache",
631 &["table_id", "actor_id", "fragment_id"],
632 registry
633 )
634 .unwrap();
635
636 let group_top_n_appendonly_cache_miss_count =
637 register_guarded_int_counter_vec_with_registry!(
638 "stream_group_top_n_appendonly_cache_miss_count",
639 "Group top n appendonly executor cache miss count",
640 &["table_id", "actor_id", "fragment_id"],
641 registry
642 )
643 .unwrap();
644
645 let group_top_n_appendonly_total_query_cache_count =
646 register_guarded_int_counter_vec_with_registry!(
647 "stream_group_top_n_appendonly_total_query_cache_count",
648 "Group top n appendonly executor total cache count",
649 &["table_id", "actor_id", "fragment_id"],
650 registry
651 )
652 .unwrap();
653
654 let group_top_n_appendonly_cached_entry_count =
655 register_guarded_int_gauge_vec_with_registry!(
656 "stream_group_top_n_appendonly_cached_entry_count",
657 "Total entry counts in group top n appendonly executor cache",
658 &["table_id", "actor_id", "fragment_id"],
659 registry
660 )
661 .unwrap();
662
663 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
664 "stream_lookup_cache_miss_count",
665 "Lookup executor cache miss count",
666 &["table_id", "actor_id", "fragment_id"],
667 registry
668 )
669 .unwrap();
670
671 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
672 "stream_lookup_total_query_cache_count",
673 "Lookup executor query cache total count",
674 &["table_id", "actor_id", "fragment_id"],
675 registry
676 )
677 .unwrap();
678
679 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
680 "stream_lookup_cached_entry_count",
681 "Total entry counts in lookup executor cache",
682 &["table_id", "actor_id", "fragment_id"],
683 registry
684 )
685 .unwrap();
686
687 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
688 "stream_temporal_join_cache_miss_count",
689 "Temporal join executor cache miss count",
690 &["table_id", "actor_id", "fragment_id"],
691 registry
692 )
693 .unwrap();
694
695 let temporal_join_total_query_cache_count =
696 register_guarded_int_counter_vec_with_registry!(
697 "stream_temporal_join_total_query_cache_count",
698 "Temporal join executor query cache total count",
699 &["table_id", "actor_id", "fragment_id"],
700 registry
701 )
702 .unwrap();
703
704 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
705 "stream_temporal_join_cached_entry_count",
706 "Total entry count in temporal join executor cache",
707 &["table_id", "actor_id", "fragment_id"],
708 registry
709 )
710 .unwrap();
711
712 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
713 "stream_agg_cached_entry_count",
714 "Number of cached keys in streaming aggregation operators",
715 &["table_id", "actor_id", "fragment_id"],
716 registry
717 )
718 .unwrap();
719
720 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
721 "stream_agg_chunk_lookup_miss_count",
722 "Aggregation executor chunk-level lookup miss duration",
723 &["table_id", "actor_id", "fragment_id"],
724 registry
725 )
726 .unwrap();
727
728 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
729 "stream_agg_chunk_lookup_total_count",
730 "Aggregation executor chunk-level lookup total operation",
731 &["table_id", "actor_id", "fragment_id"],
732 registry
733 )
734 .unwrap();
735
736 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
737 "stream_backfill_snapshot_read_row_count",
738 "Total number of rows that have been read from the backfill snapshot",
739 &["table_id", "actor_id"],
740 registry
741 )
742 .unwrap();
743
744 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
745 "stream_backfill_upstream_output_row_count",
746 "Total number of rows that have been output from the backfill upstream",
747 &["table_id", "actor_id"],
748 registry
749 )
750 .unwrap();
751
752 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
753 "stream_cdc_backfill_snapshot_read_row_count",
754 "Total number of rows that have been read from the cdc_backfill snapshot",
755 &["table_id", "actor_id"],
756 registry
757 )
758 .unwrap();
759
760 let cdc_backfill_upstream_output_row_count =
761 register_guarded_int_counter_vec_with_registry!(
762 "stream_cdc_backfill_upstream_output_row_count",
763 "Total number of rows that have been output from the cdc_backfill upstream",
764 &["table_id", "actor_id"],
765 registry
766 )
767 .unwrap();
768
769 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
770 "stream_snapshot_backfill_consume_snapshot_row_count",
771 "Total number of rows that have been output from snapshot backfill",
772 &["table_id", "actor_id", "stage"],
773 registry
774 )
775 .unwrap();
776
777 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
778 "stream_over_window_cached_entry_count",
779 "Total entry (partition) count in over window executor cache",
780 &["table_id", "actor_id", "fragment_id"],
781 registry
782 )
783 .unwrap();
784
785 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
786 "stream_over_window_cache_lookup_count",
787 "Over window executor cache lookup count",
788 &["table_id", "actor_id", "fragment_id"],
789 registry
790 )
791 .unwrap();
792
793 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
794 "stream_over_window_cache_miss_count",
795 "Over window executor cache miss count",
796 &["table_id", "actor_id", "fragment_id"],
797 registry
798 )
799 .unwrap();
800
801 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
802 "stream_over_window_range_cache_entry_count",
803 "Over window partition range cache entry count",
804 &["table_id", "actor_id", "fragment_id"],
805 registry,
806 )
807 .unwrap();
808
809 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
810 "stream_over_window_range_cache_lookup_count",
811 "Over window partition range cache lookup count",
812 &["table_id", "actor_id", "fragment_id"],
813 registry
814 )
815 .unwrap();
816
817 let over_window_range_cache_left_miss_count =
818 register_guarded_int_counter_vec_with_registry!(
819 "stream_over_window_range_cache_left_miss_count",
820 "Over window partition range cache left miss count",
821 &["table_id", "actor_id", "fragment_id"],
822 registry
823 )
824 .unwrap();
825
826 let over_window_range_cache_right_miss_count =
827 register_guarded_int_counter_vec_with_registry!(
828 "stream_over_window_range_cache_right_miss_count",
829 "Over window partition range cache right miss count",
830 &["table_id", "actor_id", "fragment_id"],
831 registry
832 )
833 .unwrap();
834
835 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
836 "stream_over_window_accessed_entry_count",
837 "Over window accessed entry count",
838 &["table_id", "actor_id", "fragment_id"],
839 registry
840 )
841 .unwrap();
842
843 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
844 "stream_over_window_compute_count",
845 "Over window compute count",
846 &["table_id", "actor_id", "fragment_id"],
847 registry
848 )
849 .unwrap();
850
851 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
852 "stream_over_window_same_output_count",
853 "Over window same output count",
854 &["table_id", "actor_id", "fragment_id"],
855 registry
856 )
857 .unwrap();
858
859 let opts = histogram_opts!(
860 "stream_barrier_inflight_duration_seconds",
861 "barrier_inflight_latency",
862 exponential_buckets(0.1, 1.5, 16).unwrap() );
864 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
865
866 let opts = histogram_opts!(
867 "stream_barrier_sync_storage_duration_seconds",
868 "barrier_sync_latency",
869 exponential_buckets(0.1, 1.5, 16).unwrap() );
871 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
872
873 let opts = histogram_opts!(
874 "stream_barrier_batch_size",
875 "barrier_batch_size",
876 exponential_buckets(1.0, 2.0, 8).unwrap()
877 );
878 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
879
880 let barrier_manager_progress = register_int_counter_with_registry!(
881 "stream_barrier_manager_progress",
882 "The number of actors that have processed the earliest in-flight barriers",
883 registry
884 )
885 .unwrap();
886
887 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
888 "sync_kv_log_store_wait_next_poll_ns",
889 "Total duration (ns) of waiting for next poll",
890 &["actor_id", "target", "fragment_id", "relation"],
891 registry
892 )
893 .unwrap();
894
895 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
896 "sync_kv_log_store_read_count",
897 "read row count throughput of sync_kv log store",
898 &["type", "actor_id", "target", "fragment_id", "relation"],
899 registry
900 )
901 .unwrap();
902
903 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
904 "sync_kv_log_store_read_size",
905 "read size throughput of sync_kv log store",
906 &["type", "actor_id", "target", "fragment_id", "relation"],
907 registry
908 )
909 .unwrap();
910
911 let sync_kv_log_store_write_pause_duration_ns =
912 register_guarded_int_counter_vec_with_registry!(
913 "sync_kv_log_store_write_pause_duration_ns",
914 "Duration (ns) of sync_kv log store write pause",
915 &["actor_id", "target", "fragment_id", "relation"],
916 registry
917 )
918 .unwrap();
919
920 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
921 "sync_kv_log_store_state",
922 "clean/unclean state transition for sync_kv log store",
923 &["state", "actor_id", "target", "fragment_id", "relation"],
924 registry
925 )
926 .unwrap();
927
928 let sync_kv_log_store_storage_write_count =
929 register_guarded_int_counter_vec_with_registry!(
930 "sync_kv_log_store_storage_write_count",
931 "Write row count throughput of sync_kv log store",
932 &["actor_id", "target", "fragment_id", "relation"],
933 registry
934 )
935 .unwrap();
936
937 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
938 "sync_kv_log_store_storage_write_size",
939 "Write size throughput of sync_kv log store",
940 &["actor_id", "target", "fragment_id", "relation"],
941 registry
942 )
943 .unwrap();
944
945 let sync_kv_log_store_buffer_unconsumed_item_count =
946 register_guarded_int_gauge_vec_with_registry!(
947 "sync_kv_log_store_buffer_unconsumed_item_count",
948 "Number of Unconsumed Item in buffer",
949 &["actor_id", "target", "fragment_id", "relation"],
950 registry
951 )
952 .unwrap();
953
954 let sync_kv_log_store_buffer_unconsumed_row_count =
955 register_guarded_int_gauge_vec_with_registry!(
956 "sync_kv_log_store_buffer_unconsumed_row_count",
957 "Number of Unconsumed Row in buffer",
958 &["actor_id", "target", "fragment_id", "relation"],
959 registry
960 )
961 .unwrap();
962
963 let sync_kv_log_store_buffer_unconsumed_epoch_count =
964 register_guarded_int_gauge_vec_with_registry!(
965 "sync_kv_log_store_buffer_unconsumed_epoch_count",
966 "Number of Unconsumed Epoch in buffer",
967 &["actor_id", "target", "fragment_id", "relation"],
968 registry
969 )
970 .unwrap();
971
972 let sync_kv_log_store_buffer_unconsumed_min_epoch =
973 register_guarded_int_gauge_vec_with_registry!(
974 "sync_kv_log_store_buffer_unconsumed_min_epoch",
975 "Number of Unconsumed Epoch in buffer",
976 &["actor_id", "target", "fragment_id", "relation"],
977 registry
978 )
979 .unwrap();
980
981 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
982 "kv_log_store_storage_write_count",
983 "Write row count throughput of kv log store",
984 &["actor_id", "connector", "sink_id", "sink_name"],
985 registry
986 )
987 .unwrap();
988
989 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
990 "kv_log_store_storage_write_size",
991 "Write size throughput of kv log store",
992 &["actor_id", "connector", "sink_id", "sink_name"],
993 registry
994 )
995 .unwrap();
996
997 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
998 "kv_log_store_storage_read_count",
999 "Write row count throughput of kv log store",
1000 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1001 registry
1002 )
1003 .unwrap();
1004
1005 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1006 "kv_log_store_storage_read_size",
1007 "Write size throughput of kv log store",
1008 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1009 registry
1010 )
1011 .unwrap();
1012
1013 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1014 "kv_log_store_rewind_count",
1015 "Kv log store rewind rate",
1016 &["actor_id", "connector", "sink_id", "sink_name"],
1017 registry
1018 )
1019 .unwrap();
1020
1021 let kv_log_store_rewind_delay_opts = {
1022 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1023 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1024 - REWIND_BASE_DELAY.as_secs_f64().log2())
1025 .ceil() as usize;
1026 let buckets = exponential_buckets(
1027 REWIND_BASE_DELAY.as_secs_f64(),
1028 REWIND_BACKOFF_FACTOR as _,
1029 bucket_count,
1030 )
1031 .unwrap();
1032 histogram_opts!(
1033 "kv_log_store_rewind_delay",
1034 "Kv log store rewind delay",
1035 buckets,
1036 )
1037 };
1038
1039 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1040 kv_log_store_rewind_delay_opts,
1041 &["actor_id", "connector", "sink_id", "sink_name"],
1042 registry
1043 )
1044 .unwrap();
1045
1046 let kv_log_store_buffer_unconsumed_item_count =
1047 register_guarded_int_gauge_vec_with_registry!(
1048 "kv_log_store_buffer_unconsumed_item_count",
1049 "Number of Unconsumed Item in buffer",
1050 &["actor_id", "connector", "sink_id", "sink_name"],
1051 registry
1052 )
1053 .unwrap();
1054
1055 let kv_log_store_buffer_unconsumed_row_count =
1056 register_guarded_int_gauge_vec_with_registry!(
1057 "kv_log_store_buffer_unconsumed_row_count",
1058 "Number of Unconsumed Row in buffer",
1059 &["actor_id", "connector", "sink_id", "sink_name"],
1060 registry
1061 )
1062 .unwrap();
1063
1064 let kv_log_store_buffer_unconsumed_epoch_count =
1065 register_guarded_int_gauge_vec_with_registry!(
1066 "kv_log_store_buffer_unconsumed_epoch_count",
1067 "Number of Unconsumed Epoch in buffer",
1068 &["actor_id", "connector", "sink_id", "sink_name"],
1069 registry
1070 )
1071 .unwrap();
1072
1073 let kv_log_store_buffer_unconsumed_min_epoch =
1074 register_guarded_int_gauge_vec_with_registry!(
1075 "kv_log_store_buffer_unconsumed_min_epoch",
1076 "Number of Unconsumed Epoch in buffer",
1077 &["actor_id", "connector", "sink_id", "sink_name"],
1078 registry
1079 )
1080 .unwrap();
1081
1082 let lru_runtime_loop_count = register_int_counter_with_registry!(
1083 "lru_runtime_loop_count",
1084 "The counts of the eviction loop in LRU manager per second",
1085 registry
1086 )
1087 .unwrap();
1088
1089 let lru_latest_sequence = register_int_gauge_with_registry!(
1090 "lru_latest_sequence",
1091 "Current LRU global sequence",
1092 registry,
1093 )
1094 .unwrap();
1095
1096 let lru_watermark_sequence = register_int_gauge_with_registry!(
1097 "lru_watermark_sequence",
1098 "Current LRU watermark sequence",
1099 registry,
1100 )
1101 .unwrap();
1102
1103 let lru_eviction_policy = register_int_gauge_with_registry!(
1104 "lru_eviction_policy",
1105 "Current LRU eviction policy",
1106 registry,
1107 )
1108 .unwrap();
1109
1110 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1111 "jemalloc_allocated_bytes",
1112 "The allocated memory jemalloc, got from jemalloc_ctl",
1113 registry
1114 )
1115 .unwrap();
1116
1117 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1118 "jemalloc_active_bytes",
1119 "The active memory jemalloc, got from jemalloc_ctl",
1120 registry
1121 )
1122 .unwrap();
1123
1124 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1125 "jemalloc_resident_bytes",
1126 "The active memory jemalloc, got from jemalloc_ctl",
1127 registry
1128 )
1129 .unwrap();
1130
1131 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1132 "jemalloc_metadata_bytes",
1133 "The active memory jemalloc, got from jemalloc_ctl",
1134 registry
1135 )
1136 .unwrap();
1137
1138 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1139 "jvm_allocated_bytes",
1140 "The allocated jvm memory",
1141 registry
1142 )
1143 .unwrap();
1144
1145 let jvm_active_bytes = register_int_gauge_with_registry!(
1146 "jvm_active_bytes",
1147 "The active jvm memory",
1148 registry
1149 )
1150 .unwrap();
1151
1152 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1153 "stream_materialize_cache_hit_count",
1154 "Materialize executor cache hit count",
1155 &["actor_id", "table_id", "fragment_id"],
1156 registry
1157 )
1158 .unwrap()
1159 .relabel_debug_1(level);
1160
1161 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1162 "stream_materialize_cache_total_count",
1163 "Materialize executor cache total operation",
1164 &["actor_id", "table_id", "fragment_id"],
1165 registry
1166 )
1167 .unwrap()
1168 .relabel_debug_1(level);
1169
1170 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1171 "stream_memory_usage",
1172 "Memory usage for stream executors",
1173 &["actor_id", "table_id", "desc"],
1174 registry
1175 )
1176 .unwrap()
1177 .relabel_debug_1(level);
1178
1179 Self {
1180 level,
1181 executor_row_count,
1182 mem_stream_node_output_row_count: stream_node_output_row_count,
1183 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1184 actor_execution_time,
1185 actor_scheduled_duration,
1186 actor_scheduled_cnt,
1187 actor_fast_poll_duration,
1188 actor_fast_poll_cnt,
1189 actor_slow_poll_duration,
1190 actor_slow_poll_cnt,
1191 actor_poll_duration,
1192 actor_poll_cnt,
1193 actor_idle_duration,
1194 actor_idle_cnt,
1195 actor_count,
1196 actor_in_record_cnt,
1197 actor_out_record_cnt,
1198 actor_current_epoch,
1199 source_output_row_count,
1200 source_split_change_count,
1201 source_backfill_row_count,
1202 sink_input_row_count,
1203 sink_input_bytes,
1204 sink_chunk_buffer_size,
1205 exchange_frag_recv_size,
1206 merge_barrier_align_duration,
1207 actor_output_buffer_blocking_duration_ns,
1208 actor_input_buffer_blocking_duration_ns,
1209 join_lookup_miss_count,
1210 join_lookup_total_count,
1211 join_insert_cache_miss_count,
1212 join_actor_input_waiting_duration_ns,
1213 join_match_duration_ns,
1214 join_cached_entry_count,
1215 join_matched_join_keys,
1216 barrier_align_duration,
1217 agg_lookup_miss_count,
1218 agg_total_lookup_count,
1219 agg_cached_entry_count,
1220 agg_chunk_lookup_miss_count,
1221 agg_chunk_total_lookup_count,
1222 agg_dirty_groups_count,
1223 agg_dirty_groups_heap_size,
1224 agg_distinct_cache_miss_count,
1225 agg_distinct_total_cache_count,
1226 agg_distinct_cached_entry_count,
1227 agg_state_cache_lookup_count,
1228 agg_state_cache_miss_count,
1229 group_top_n_cache_miss_count,
1230 group_top_n_total_query_cache_count,
1231 group_top_n_cached_entry_count,
1232 group_top_n_appendonly_cache_miss_count,
1233 group_top_n_appendonly_total_query_cache_count,
1234 group_top_n_appendonly_cached_entry_count,
1235 lookup_cache_miss_count,
1236 lookup_total_query_cache_count,
1237 lookup_cached_entry_count,
1238 temporal_join_cache_miss_count,
1239 temporal_join_total_query_cache_count,
1240 temporal_join_cached_entry_count,
1241 backfill_snapshot_read_row_count,
1242 backfill_upstream_output_row_count,
1243 cdc_backfill_snapshot_read_row_count,
1244 cdc_backfill_upstream_output_row_count,
1245 snapshot_backfill_consume_row_count,
1246 over_window_cached_entry_count,
1247 over_window_cache_lookup_count,
1248 over_window_cache_miss_count,
1249 over_window_range_cache_entry_count,
1250 over_window_range_cache_lookup_count,
1251 over_window_range_cache_left_miss_count,
1252 over_window_range_cache_right_miss_count,
1253 over_window_accessed_entry_count,
1254 over_window_compute_count,
1255 over_window_same_output_count,
1256 barrier_inflight_latency,
1257 barrier_sync_latency,
1258 barrier_batch_size,
1259 barrier_manager_progress,
1260 kv_log_store_storage_write_count,
1261 kv_log_store_storage_write_size,
1262 kv_log_store_rewind_count,
1263 kv_log_store_rewind_delay,
1264 kv_log_store_storage_read_count,
1265 kv_log_store_storage_read_size,
1266 kv_log_store_buffer_unconsumed_item_count,
1267 kv_log_store_buffer_unconsumed_row_count,
1268 kv_log_store_buffer_unconsumed_epoch_count,
1269 kv_log_store_buffer_unconsumed_min_epoch,
1270 sync_kv_log_store_read_count,
1271 sync_kv_log_store_read_size,
1272 sync_kv_log_store_write_pause_duration_ns,
1273 sync_kv_log_store_state,
1274 sync_kv_log_store_wait_next_poll_ns,
1275 sync_kv_log_store_storage_write_count,
1276 sync_kv_log_store_storage_write_size,
1277 sync_kv_log_store_buffer_unconsumed_item_count,
1278 sync_kv_log_store_buffer_unconsumed_row_count,
1279 sync_kv_log_store_buffer_unconsumed_epoch_count,
1280 sync_kv_log_store_buffer_unconsumed_min_epoch,
1281 lru_runtime_loop_count,
1282 lru_latest_sequence,
1283 lru_watermark_sequence,
1284 lru_eviction_policy,
1285 jemalloc_allocated_bytes,
1286 jemalloc_active_bytes,
1287 jemalloc_resident_bytes,
1288 jemalloc_metadata_bytes,
1289 jvm_allocated_bytes,
1290 jvm_active_bytes,
1291 stream_memory_usage,
1292 materialize_cache_hit_count,
1293 materialize_cache_total_count,
1294 materialize_input_row_count,
1295 materialize_current_epoch,
1296 }
1297 }
1298
1299 pub fn unused() -> Self {
1301 global_streaming_metrics(MetricLevel::Disabled)
1302 }
1303
1304 pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics {
1305 let label_list: &[&str; 1] = &[&actor_id.to_string()];
1306 let actor_execution_time = self
1307 .actor_execution_time
1308 .with_guarded_label_values(label_list);
1309 let actor_scheduled_duration = self
1310 .actor_scheduled_duration
1311 .with_guarded_label_values(label_list);
1312 let actor_scheduled_cnt = self
1313 .actor_scheduled_cnt
1314 .with_guarded_label_values(label_list);
1315 let actor_fast_poll_duration = self
1316 .actor_fast_poll_duration
1317 .with_guarded_label_values(label_list);
1318 let actor_fast_poll_cnt = self
1319 .actor_fast_poll_cnt
1320 .with_guarded_label_values(label_list);
1321 let actor_slow_poll_duration = self
1322 .actor_slow_poll_duration
1323 .with_guarded_label_values(label_list);
1324 let actor_slow_poll_cnt = self
1325 .actor_slow_poll_cnt
1326 .with_guarded_label_values(label_list);
1327 let actor_poll_duration = self
1328 .actor_poll_duration
1329 .with_guarded_label_values(label_list);
1330 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1331 let actor_idle_duration = self
1332 .actor_idle_duration
1333 .with_guarded_label_values(label_list);
1334 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1335 ActorMetrics {
1336 actor_execution_time,
1337 actor_scheduled_duration,
1338 actor_scheduled_cnt,
1339 actor_fast_poll_duration,
1340 actor_fast_poll_cnt,
1341 actor_slow_poll_duration,
1342 actor_slow_poll_cnt,
1343 actor_poll_duration,
1344 actor_poll_cnt,
1345 actor_idle_duration,
1346 actor_idle_cnt,
1347 }
1348 }
1349
1350 pub(crate) fn new_actor_input_metrics(
1351 &self,
1352 actor_id: ActorId,
1353 fragment_id: FragmentId,
1354 upstream_fragment_id: FragmentId,
1355 ) -> ActorInputMetrics {
1356 let actor_id_str = actor_id.to_string();
1357 let fragment_id_str = fragment_id.to_string();
1358 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1359 ActorInputMetrics {
1360 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1361 &actor_id_str,
1362 &fragment_id_str,
1363 &upstream_fragment_id_str,
1364 ]),
1365 actor_input_buffer_blocking_duration_ns: self
1366 .actor_input_buffer_blocking_duration_ns
1367 .with_guarded_label_values(&[
1368 &actor_id_str,
1369 &fragment_id_str,
1370 &upstream_fragment_id_str,
1371 ]),
1372 }
1373 }
1374
1375 pub fn new_sink_exec_metrics(
1376 &self,
1377 id: SinkId,
1378 actor_id: ActorId,
1379 fragment_id: FragmentId,
1380 ) -> SinkExecutorMetrics {
1381 let label_list: &[&str; 3] = &[
1382 &id.to_string(),
1383 &actor_id.to_string(),
1384 &fragment_id.to_string(),
1385 ];
1386 SinkExecutorMetrics {
1387 sink_input_row_count: self
1388 .sink_input_row_count
1389 .with_guarded_label_values(label_list),
1390 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1391 sink_chunk_buffer_size: self
1392 .sink_chunk_buffer_size
1393 .with_guarded_label_values(label_list),
1394 }
1395 }
1396
1397 pub fn new_group_top_n_metrics(
1398 &self,
1399 table_id: u32,
1400 actor_id: ActorId,
1401 fragment_id: FragmentId,
1402 ) -> GroupTopNMetrics {
1403 let label_list: &[&str; 3] = &[
1404 &table_id.to_string(),
1405 &actor_id.to_string(),
1406 &fragment_id.to_string(),
1407 ];
1408
1409 GroupTopNMetrics {
1410 group_top_n_cache_miss_count: self
1411 .group_top_n_cache_miss_count
1412 .with_guarded_label_values(label_list),
1413 group_top_n_total_query_cache_count: self
1414 .group_top_n_total_query_cache_count
1415 .with_guarded_label_values(label_list),
1416 group_top_n_cached_entry_count: self
1417 .group_top_n_cached_entry_count
1418 .with_guarded_label_values(label_list),
1419 }
1420 }
1421
1422 pub fn new_append_only_group_top_n_metrics(
1423 &self,
1424 table_id: u32,
1425 actor_id: ActorId,
1426 fragment_id: FragmentId,
1427 ) -> GroupTopNMetrics {
1428 let label_list: &[&str; 3] = &[
1429 &table_id.to_string(),
1430 &actor_id.to_string(),
1431 &fragment_id.to_string(),
1432 ];
1433
1434 GroupTopNMetrics {
1435 group_top_n_cache_miss_count: self
1436 .group_top_n_appendonly_cache_miss_count
1437 .with_guarded_label_values(label_list),
1438 group_top_n_total_query_cache_count: self
1439 .group_top_n_appendonly_total_query_cache_count
1440 .with_guarded_label_values(label_list),
1441 group_top_n_cached_entry_count: self
1442 .group_top_n_appendonly_cached_entry_count
1443 .with_guarded_label_values(label_list),
1444 }
1445 }
1446
1447 pub fn new_lookup_executor_metrics(
1448 &self,
1449 table_id: TableId,
1450 actor_id: ActorId,
1451 fragment_id: FragmentId,
1452 ) -> LookupExecutorMetrics {
1453 let label_list: &[&str; 3] = &[
1454 &table_id.to_string(),
1455 &actor_id.to_string(),
1456 &fragment_id.to_string(),
1457 ];
1458
1459 LookupExecutorMetrics {
1460 lookup_cache_miss_count: self
1461 .lookup_cache_miss_count
1462 .with_guarded_label_values(label_list),
1463 lookup_total_query_cache_count: self
1464 .lookup_total_query_cache_count
1465 .with_guarded_label_values(label_list),
1466 lookup_cached_entry_count: self
1467 .lookup_cached_entry_count
1468 .with_guarded_label_values(label_list),
1469 }
1470 }
1471
1472 pub fn new_hash_agg_metrics(
1473 &self,
1474 table_id: u32,
1475 actor_id: ActorId,
1476 fragment_id: FragmentId,
1477 ) -> HashAggMetrics {
1478 let label_list: &[&str; 3] = &[
1479 &table_id.to_string(),
1480 &actor_id.to_string(),
1481 &fragment_id.to_string(),
1482 ];
1483 HashAggMetrics {
1484 agg_lookup_miss_count: self
1485 .agg_lookup_miss_count
1486 .with_guarded_label_values(label_list),
1487 agg_total_lookup_count: self
1488 .agg_total_lookup_count
1489 .with_guarded_label_values(label_list),
1490 agg_cached_entry_count: self
1491 .agg_cached_entry_count
1492 .with_guarded_label_values(label_list),
1493 agg_chunk_lookup_miss_count: self
1494 .agg_chunk_lookup_miss_count
1495 .with_guarded_label_values(label_list),
1496 agg_chunk_total_lookup_count: self
1497 .agg_chunk_total_lookup_count
1498 .with_guarded_label_values(label_list),
1499 agg_dirty_groups_count: self
1500 .agg_dirty_groups_count
1501 .with_guarded_label_values(label_list),
1502 agg_dirty_groups_heap_size: self
1503 .agg_dirty_groups_heap_size
1504 .with_guarded_label_values(label_list),
1505 agg_state_cache_lookup_count: self
1506 .agg_state_cache_lookup_count
1507 .with_guarded_label_values(label_list),
1508 agg_state_cache_miss_count: self
1509 .agg_state_cache_miss_count
1510 .with_guarded_label_values(label_list),
1511 }
1512 }
1513
1514 pub fn new_agg_distinct_dedup_metrics(
1515 &self,
1516 table_id: u32,
1517 actor_id: ActorId,
1518 fragment_id: FragmentId,
1519 ) -> AggDistinctDedupMetrics {
1520 let label_list: &[&str; 3] = &[
1521 &table_id.to_string(),
1522 &actor_id.to_string(),
1523 &fragment_id.to_string(),
1524 ];
1525 AggDistinctDedupMetrics {
1526 agg_distinct_cache_miss_count: self
1527 .agg_distinct_cache_miss_count
1528 .with_guarded_label_values(label_list),
1529 agg_distinct_total_cache_count: self
1530 .agg_distinct_total_cache_count
1531 .with_guarded_label_values(label_list),
1532 agg_distinct_cached_entry_count: self
1533 .agg_distinct_cached_entry_count
1534 .with_guarded_label_values(label_list),
1535 }
1536 }
1537
1538 pub fn new_temporal_join_metrics(
1539 &self,
1540 table_id: TableId,
1541 actor_id: ActorId,
1542 fragment_id: FragmentId,
1543 ) -> TemporalJoinMetrics {
1544 let label_list: &[&str; 3] = &[
1545 &table_id.to_string(),
1546 &actor_id.to_string(),
1547 &fragment_id.to_string(),
1548 ];
1549 TemporalJoinMetrics {
1550 temporal_join_cache_miss_count: self
1551 .temporal_join_cache_miss_count
1552 .with_guarded_label_values(label_list),
1553 temporal_join_total_query_cache_count: self
1554 .temporal_join_total_query_cache_count
1555 .with_guarded_label_values(label_list),
1556 temporal_join_cached_entry_count: self
1557 .temporal_join_cached_entry_count
1558 .with_guarded_label_values(label_list),
1559 }
1560 }
1561
1562 pub fn new_backfill_metrics(&self, table_id: u32, actor_id: ActorId) -> BackfillMetrics {
1563 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1564 BackfillMetrics {
1565 backfill_snapshot_read_row_count: self
1566 .backfill_snapshot_read_row_count
1567 .with_guarded_label_values(label_list),
1568 backfill_upstream_output_row_count: self
1569 .backfill_upstream_output_row_count
1570 .with_guarded_label_values(label_list),
1571 }
1572 }
1573
1574 pub fn new_cdc_backfill_metrics(
1575 &self,
1576 table_id: TableId,
1577 actor_id: ActorId,
1578 ) -> CdcBackfillMetrics {
1579 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1580 CdcBackfillMetrics {
1581 cdc_backfill_snapshot_read_row_count: self
1582 .cdc_backfill_snapshot_read_row_count
1583 .with_guarded_label_values(label_list),
1584 cdc_backfill_upstream_output_row_count: self
1585 .cdc_backfill_upstream_output_row_count
1586 .with_guarded_label_values(label_list),
1587 }
1588 }
1589
1590 pub fn new_over_window_metrics(
1591 &self,
1592 table_id: u32,
1593 actor_id: ActorId,
1594 fragment_id: FragmentId,
1595 ) -> OverWindowMetrics {
1596 let label_list: &[&str; 3] = &[
1597 &table_id.to_string(),
1598 &actor_id.to_string(),
1599 &fragment_id.to_string(),
1600 ];
1601 OverWindowMetrics {
1602 over_window_cached_entry_count: self
1603 .over_window_cached_entry_count
1604 .with_guarded_label_values(label_list),
1605 over_window_cache_lookup_count: self
1606 .over_window_cache_lookup_count
1607 .with_guarded_label_values(label_list),
1608 over_window_cache_miss_count: self
1609 .over_window_cache_miss_count
1610 .with_guarded_label_values(label_list),
1611 over_window_range_cache_entry_count: self
1612 .over_window_range_cache_entry_count
1613 .with_guarded_label_values(label_list),
1614 over_window_range_cache_lookup_count: self
1615 .over_window_range_cache_lookup_count
1616 .with_guarded_label_values(label_list),
1617 over_window_range_cache_left_miss_count: self
1618 .over_window_range_cache_left_miss_count
1619 .with_guarded_label_values(label_list),
1620 over_window_range_cache_right_miss_count: self
1621 .over_window_range_cache_right_miss_count
1622 .with_guarded_label_values(label_list),
1623 over_window_accessed_entry_count: self
1624 .over_window_accessed_entry_count
1625 .with_guarded_label_values(label_list),
1626 over_window_compute_count: self
1627 .over_window_compute_count
1628 .with_guarded_label_values(label_list),
1629 over_window_same_output_count: self
1630 .over_window_same_output_count
1631 .with_guarded_label_values(label_list),
1632 }
1633 }
1634
1635 pub fn new_materialize_metrics(
1636 &self,
1637 table_id: TableId,
1638 actor_id: ActorId,
1639 fragment_id: FragmentId,
1640 ) -> MaterializeMetrics {
1641 let label_list: &[&str; 3] = &[
1642 &actor_id.to_string(),
1643 &table_id.to_string(),
1644 &fragment_id.to_string(),
1645 ];
1646 MaterializeMetrics {
1647 materialize_cache_hit_count: self
1648 .materialize_cache_hit_count
1649 .with_guarded_label_values(label_list),
1650 materialize_cache_total_count: self
1651 .materialize_cache_total_count
1652 .with_guarded_label_values(label_list),
1653 materialize_input_row_count: self
1654 .materialize_input_row_count
1655 .with_guarded_label_values(label_list),
1656 materialize_current_epoch: self
1657 .materialize_current_epoch
1658 .with_guarded_label_values(label_list),
1659 }
1660 }
1661}
1662
1663pub(crate) struct ActorInputMetrics {
1664 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter<3>,
1665 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>,
1666}
1667
1668pub struct ActorMetrics {
1670 pub actor_execution_time: LabelGuardedGauge<1>,
1671 pub actor_scheduled_duration: LabelGuardedGauge<1>,
1672 pub actor_scheduled_cnt: LabelGuardedIntGauge<1>,
1673 pub actor_fast_poll_duration: LabelGuardedGauge<1>,
1674 pub actor_fast_poll_cnt: LabelGuardedIntGauge<1>,
1675 pub actor_slow_poll_duration: LabelGuardedGauge<1>,
1676 pub actor_slow_poll_cnt: LabelGuardedIntGauge<1>,
1677 pub actor_poll_duration: LabelGuardedGauge<1>,
1678 pub actor_poll_cnt: LabelGuardedIntGauge<1>,
1679 pub actor_idle_duration: LabelGuardedGauge<1>,
1680 pub actor_idle_cnt: LabelGuardedIntGauge<1>,
1681}
1682
1683pub struct SinkExecutorMetrics {
1684 pub sink_input_row_count: LabelGuardedIntCounter<3>,
1685 pub sink_input_bytes: LabelGuardedIntCounter<3>,
1686 pub sink_chunk_buffer_size: LabelGuardedIntGauge<3>,
1687}
1688
1689pub struct MaterializeMetrics {
1690 pub materialize_cache_hit_count: LabelGuardedIntCounter<3>,
1691 pub materialize_cache_total_count: LabelGuardedIntCounter<3>,
1692 pub materialize_input_row_count: LabelGuardedIntCounter<3>,
1693 pub materialize_current_epoch: LabelGuardedIntGauge<3>,
1694}
1695
1696pub struct GroupTopNMetrics {
1697 pub group_top_n_cache_miss_count: LabelGuardedIntCounter<3>,
1698 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter<3>,
1699 pub group_top_n_cached_entry_count: LabelGuardedIntGauge<3>,
1700}
1701
1702pub struct LookupExecutorMetrics {
1703 pub lookup_cache_miss_count: LabelGuardedIntCounter<3>,
1704 pub lookup_total_query_cache_count: LabelGuardedIntCounter<3>,
1705 pub lookup_cached_entry_count: LabelGuardedIntGauge<3>,
1706}
1707
1708pub struct HashAggMetrics {
1709 pub agg_lookup_miss_count: LabelGuardedIntCounter<3>,
1710 pub agg_total_lookup_count: LabelGuardedIntCounter<3>,
1711 pub agg_cached_entry_count: LabelGuardedIntGauge<3>,
1712 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter<3>,
1713 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter<3>,
1714 pub agg_dirty_groups_count: LabelGuardedIntGauge<3>,
1715 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge<3>,
1716 pub agg_state_cache_lookup_count: LabelGuardedIntCounter<3>,
1717 pub agg_state_cache_miss_count: LabelGuardedIntCounter<3>,
1718}
1719
1720pub struct AggDistinctDedupMetrics {
1721 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter<3>,
1722 pub agg_distinct_total_cache_count: LabelGuardedIntCounter<3>,
1723 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge<3>,
1724}
1725
1726pub struct TemporalJoinMetrics {
1727 pub temporal_join_cache_miss_count: LabelGuardedIntCounter<3>,
1728 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter<3>,
1729 pub temporal_join_cached_entry_count: LabelGuardedIntGauge<3>,
1730}
1731
1732pub struct BackfillMetrics {
1733 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter<2>,
1734 pub backfill_upstream_output_row_count: LabelGuardedIntCounter<2>,
1735}
1736
1737pub struct CdcBackfillMetrics {
1738 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter<2>,
1739 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter<2>,
1740}
1741
1742pub struct OverWindowMetrics {
1743 pub over_window_cached_entry_count: LabelGuardedIntGauge<3>,
1744 pub over_window_cache_lookup_count: LabelGuardedIntCounter<3>,
1745 pub over_window_cache_miss_count: LabelGuardedIntCounter<3>,
1746 pub over_window_range_cache_entry_count: LabelGuardedIntGauge<3>,
1747 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter<3>,
1748 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter<3>,
1749 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter<3>,
1750 pub over_window_accessed_entry_count: LabelGuardedIntCounter<3>,
1751 pub over_window_compute_count: LabelGuardedIntCounter<3>,
1752 pub over_window_same_output_count: LabelGuardedIntCounter<3>,
1753}