1use std::sync::OnceLock;
16
17use prometheus::{
18 Histogram, IntCounter, IntGauge, Registry, exponential_buckets, histogram_opts,
19 register_histogram_with_registry, register_int_counter_with_registry,
20 register_int_gauge_with_registry,
21};
22use risingwave_common::catalog::TableId;
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 LabelGuardedHistogramVec, LabelGuardedIntCounter, LabelGuardedIntCounterVec,
26 LabelGuardedIntGauge, LabelGuardedIntGaugeVec, MetricVecRelabelExt,
27 RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::monitor::in_mem::CountMap;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::sink::catalog::SinkId;
36use risingwave_pb::id::ExecutorId;
37
38use crate::common::log_store_impl::kv_log_store::{
39 REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY,
40};
41use crate::executor::prelude::ActorId;
42use crate::task::FragmentId;
43
44#[derive(Clone)]
45pub struct StreamingMetrics {
46 pub level: MetricLevel,
47
48 pub executor_row_count: RelabeledGuardedIntCounterVec,
50
51 pub mem_stream_node_output_row_count: CountMap<ExecutorId>,
55 pub mem_stream_node_output_blocking_duration_ns: CountMap<ExecutorId>,
56
57 actor_scheduled_duration: RelabeledGuardedIntCounterVec,
59 actor_scheduled_cnt: RelabeledGuardedIntCounterVec,
60 actor_poll_duration: RelabeledGuardedIntCounterVec,
61 actor_poll_cnt: RelabeledGuardedIntCounterVec,
62 actor_idle_duration: RelabeledGuardedIntCounterVec,
63 actor_idle_cnt: RelabeledGuardedIntCounterVec,
64
65 pub actor_count: LabelGuardedIntGaugeVec,
67 pub actor_in_record_cnt: RelabeledGuardedIntCounterVec,
68 pub actor_out_record_cnt: RelabeledGuardedIntCounterVec,
69 pub fragment_channel_buffered_bytes: LabelGuardedIntGaugeVec,
70 pub actor_current_epoch: RelabeledGuardedIntGaugeVec,
71
72 pub source_output_row_count: LabelGuardedIntCounterVec,
74 pub source_split_change_count: LabelGuardedIntCounterVec,
75 pub source_backfill_row_count: LabelGuardedIntCounterVec,
76
77 sink_input_row_count: LabelGuardedIntCounterVec,
79 sink_input_bytes: LabelGuardedIntCounterVec,
80 sink_chunk_buffer_size: LabelGuardedIntGaugeVec,
81
82 pub exchange_frag_recv_size: LabelGuardedIntCounterVec,
84
85 pub merge_barrier_align_duration: RelabeledGuardedIntCounterVec,
88
89 pub actor_output_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
91 actor_input_buffer_blocking_duration_ns: RelabeledGuardedIntCounterVec,
92
93 pub join_lookup_miss_count: LabelGuardedIntCounterVec,
95 pub join_lookup_total_count: LabelGuardedIntCounterVec,
96 pub join_insert_cache_miss_count: LabelGuardedIntCounterVec,
97 pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec,
98 pub join_match_duration_ns: LabelGuardedIntCounterVec,
99 pub join_cached_entry_count: LabelGuardedIntGaugeVec,
100 pub join_matched_join_keys: RelabeledGuardedHistogramVec,
101
102 pub barrier_align_duration: RelabeledGuardedIntCounterVec,
104
105 agg_lookup_miss_count: LabelGuardedIntCounterVec,
107 agg_total_lookup_count: LabelGuardedIntCounterVec,
108 agg_cached_entry_count: LabelGuardedIntGaugeVec,
109 agg_chunk_lookup_miss_count: LabelGuardedIntCounterVec,
110 agg_chunk_total_lookup_count: LabelGuardedIntCounterVec,
111 agg_dirty_groups_count: LabelGuardedIntGaugeVec,
112 agg_dirty_groups_heap_size: LabelGuardedIntGaugeVec,
113 agg_distinct_cache_miss_count: LabelGuardedIntCounterVec,
114 agg_distinct_total_cache_count: LabelGuardedIntCounterVec,
115 agg_distinct_cached_entry_count: LabelGuardedIntGaugeVec,
116 agg_state_cache_lookup_count: LabelGuardedIntCounterVec,
117 agg_state_cache_miss_count: LabelGuardedIntCounterVec,
118
119 group_top_n_cache_miss_count: LabelGuardedIntCounterVec,
121 group_top_n_total_query_cache_count: LabelGuardedIntCounterVec,
122 group_top_n_cached_entry_count: LabelGuardedIntGaugeVec,
123 group_top_n_appendonly_cache_miss_count: LabelGuardedIntCounterVec,
125 group_top_n_appendonly_total_query_cache_count: LabelGuardedIntCounterVec,
126 group_top_n_appendonly_cached_entry_count: LabelGuardedIntGaugeVec,
127
128 lookup_cache_miss_count: LabelGuardedIntCounterVec,
130 lookup_total_query_cache_count: LabelGuardedIntCounterVec,
131 lookup_cached_entry_count: LabelGuardedIntGaugeVec,
132
133 temporal_join_cache_miss_count: LabelGuardedIntCounterVec,
135 temporal_join_total_query_cache_count: LabelGuardedIntCounterVec,
136 temporal_join_cached_entry_count: LabelGuardedIntGaugeVec,
137
138 backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
140 backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
141
142 cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounterVec,
144 cdc_backfill_upstream_output_row_count: LabelGuardedIntCounterVec,
145
146 pub(crate) snapshot_backfill_consume_row_count: LabelGuardedIntCounterVec,
148
149 over_window_cached_entry_count: LabelGuardedIntGaugeVec,
151 over_window_cache_lookup_count: LabelGuardedIntCounterVec,
152 over_window_cache_miss_count: LabelGuardedIntCounterVec,
153 over_window_range_cache_entry_count: LabelGuardedIntGaugeVec,
154 over_window_range_cache_lookup_count: LabelGuardedIntCounterVec,
155 over_window_range_cache_left_miss_count: LabelGuardedIntCounterVec,
156 over_window_range_cache_right_miss_count: LabelGuardedIntCounterVec,
157 over_window_accessed_entry_count: LabelGuardedIntCounterVec,
158 over_window_compute_count: LabelGuardedIntCounterVec,
159 over_window_same_output_count: LabelGuardedIntCounterVec,
160
161 pub barrier_inflight_latency: Histogram,
165 pub barrier_sync_latency: Histogram,
167 pub barrier_batch_size: Histogram,
168 pub barrier_manager_progress: IntCounter,
170
171 pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
172 pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
173 pub kv_log_store_rewind_count: LabelGuardedIntCounterVec,
174 pub kv_log_store_rewind_delay: LabelGuardedHistogramVec,
175 pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec,
176 pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec,
177 pub kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
178 pub kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
179 pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
180 pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
181 pub kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
182
183 pub crossdb_last_consumed_min_epoch: LabelGuardedIntGaugeVec,
184
185 pub sync_kv_log_store_read_count: LabelGuardedIntCounterVec,
186 pub sync_kv_log_store_read_size: LabelGuardedIntCounterVec,
187 pub sync_kv_log_store_write_pause_duration_ns: LabelGuardedIntCounterVec,
188 pub sync_kv_log_store_state: LabelGuardedIntCounterVec,
189 pub sync_kv_log_store_wait_next_poll_ns: LabelGuardedIntCounterVec,
190 pub sync_kv_log_store_storage_write_count: LabelGuardedIntCounterVec,
191 pub sync_kv_log_store_storage_write_size: LabelGuardedIntCounterVec,
192 pub sync_kv_log_store_buffer_unconsumed_item_count: LabelGuardedIntGaugeVec,
193 pub sync_kv_log_store_buffer_unconsumed_row_count: LabelGuardedIntGaugeVec,
194 pub sync_kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec,
195 pub sync_kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec,
196 pub sync_kv_log_store_buffer_memory_bytes: LabelGuardedIntGaugeVec,
197
198 pub lru_runtime_loop_count: IntCounter,
200 pub lru_latest_sequence: IntGauge,
201 pub lru_watermark_sequence: IntGauge,
202 pub lru_eviction_policy: IntGauge,
203 pub jemalloc_allocated_bytes: IntGauge,
204 pub jemalloc_active_bytes: IntGauge,
205 pub jemalloc_resident_bytes: IntGauge,
206 pub jemalloc_metadata_bytes: IntGauge,
207 pub jvm_allocated_bytes: IntGauge,
208 pub jvm_active_bytes: IntGauge,
209 pub stream_memory_usage: RelabeledGuardedIntGaugeVec,
210
211 materialize_cache_hit_count: RelabeledGuardedIntCounterVec,
213 materialize_data_exist_count: RelabeledGuardedIntCounterVec,
214 materialize_cache_total_count: RelabeledGuardedIntCounterVec,
215 materialize_input_row_count: RelabeledGuardedIntCounterVec,
216 pub materialize_current_epoch: RelabeledGuardedIntGaugeVec,
217
218 pub pg_cdc_state_table_lsn: LabelGuardedIntGaugeVec,
220 pub pg_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
221
222 pub mysql_cdc_state_binlog_file_seq: LabelGuardedIntGaugeVec,
224 pub mysql_cdc_state_binlog_position: LabelGuardedIntGaugeVec,
225
226 pub sqlserver_cdc_state_change_lsn: LabelGuardedIntGaugeVec,
228 pub sqlserver_cdc_state_commit_lsn: LabelGuardedIntGaugeVec,
229 pub sqlserver_cdc_jni_commit_offset_lsn: LabelGuardedIntGaugeVec,
230
231 pub gap_fill_generated_rows_count: RelabeledGuardedIntCounterVec,
233
234 pub state_table_iter_count: RelabeledGuardedIntCounterVec,
236 pub state_table_get_count: RelabeledGuardedIntCounterVec,
237 pub state_table_iter_vnode_pruned_count: RelabeledGuardedIntCounterVec,
238 pub state_table_get_vnode_pruned_count: RelabeledGuardedIntCounterVec,
239}
240
241pub static GLOBAL_STREAMING_METRICS: OnceLock<StreamingMetrics> = OnceLock::new();
242
243pub fn global_streaming_metrics(metric_level: MetricLevel) -> StreamingMetrics {
244 GLOBAL_STREAMING_METRICS
245 .get_or_init(|| StreamingMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
246 .clone()
247}
248
249impl StreamingMetrics {
250 pub fn new(registry: &Registry, level: MetricLevel) -> Self {
251 let executor_row_count = register_guarded_int_counter_vec_with_registry!(
252 "stream_executor_row_count",
253 "Total number of rows that have been output from each executor",
254 &["actor_id", "fragment_id", "executor_identity"],
255 registry
256 )
257 .unwrap()
258 .relabel_debug_1(level);
259
260 let stream_node_output_row_count = CountMap::new();
261 let stream_node_output_blocking_duration_ns = CountMap::new();
262
263 let source_output_row_count = register_guarded_int_counter_vec_with_registry!(
264 "stream_source_output_rows_counts",
265 "Total number of rows that have been output from source",
266 &["source_id", "source_name", "actor_id", "fragment_id"],
267 registry
268 )
269 .unwrap();
270
271 let source_split_change_count = register_guarded_int_counter_vec_with_registry!(
272 "stream_source_split_change_event_count",
273 "Total number of split change events that have been operated by source",
274 &["source_id", "source_name", "actor_id", "fragment_id"],
275 registry
276 )
277 .unwrap();
278
279 let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
280 "stream_source_backfill_rows_counts",
281 "Total number of rows that have been backfilled for source",
282 &["source_id", "source_name", "actor_id", "fragment_id"],
283 registry
284 )
285 .unwrap();
286
287 let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
288 "stream_sink_input_row_count",
289 "Total number of rows streamed into sink executors",
290 &["sink_id", "actor_id", "fragment_id"],
291 registry
292 )
293 .unwrap();
294
295 let sink_input_bytes = register_guarded_int_counter_vec_with_registry!(
296 "stream_sink_input_bytes",
297 "Total size of chunks streamed into sink executors",
298 &["sink_id", "actor_id", "fragment_id"],
299 registry
300 )
301 .unwrap();
302
303 let materialize_input_row_count = register_guarded_int_counter_vec_with_registry!(
304 "stream_mview_input_row_count",
305 "Total number of rows streamed into materialize executors",
306 &["actor_id", "table_id", "fragment_id"],
307 registry
308 )
309 .unwrap()
310 .relabel_debug_1(level);
311
312 let materialize_current_epoch = register_guarded_int_gauge_vec_with_registry!(
313 "stream_mview_current_epoch",
314 "The current epoch of the materialized executor",
315 &["actor_id", "table_id", "fragment_id"],
316 registry
317 )
318 .unwrap()
319 .relabel_debug_1(level);
320
321 let pg_cdc_state_table_lsn = register_guarded_int_gauge_vec_with_registry!(
322 "stream_pg_cdc_state_table_lsn",
323 "Current LSN value stored in PostgreSQL CDC state table",
324 &["source_id"],
325 registry,
326 )
327 .unwrap();
328
329 let pg_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
330 "stream_pg_cdc_jni_commit_offset_lsn",
331 "LSN value when JNI commit offset is called for PostgreSQL CDC",
332 &["source_id"],
333 registry,
334 )
335 .unwrap();
336
337 let mysql_cdc_state_binlog_file_seq = register_guarded_int_gauge_vec_with_registry!(
338 "stream_mysql_cdc_state_binlog_file_seq",
339 "Current binlog file sequence number stored in MySQL CDC state table",
340 &["source_id"],
341 registry,
342 )
343 .unwrap();
344
345 let mysql_cdc_state_binlog_position = register_guarded_int_gauge_vec_with_registry!(
346 "stream_mysql_cdc_state_binlog_position",
347 "Current binlog position stored in MySQL CDC state table",
348 &["source_id"],
349 registry,
350 )
351 .unwrap();
352
353 let sqlserver_cdc_state_change_lsn = register_guarded_int_gauge_vec_with_registry!(
354 "stream_sqlserver_cdc_state_change_lsn",
355 "Current change_lsn value stored in SQL Server CDC state table",
356 &["source_id"],
357 registry,
358 )
359 .unwrap();
360
361 let sqlserver_cdc_state_commit_lsn = register_guarded_int_gauge_vec_with_registry!(
362 "stream_sqlserver_cdc_state_commit_lsn",
363 "Current commit_lsn value stored in SQL Server CDC state table",
364 &["source_id"],
365 registry,
366 )
367 .unwrap();
368
369 let sqlserver_cdc_jni_commit_offset_lsn = register_guarded_int_gauge_vec_with_registry!(
370 "stream_sqlserver_cdc_jni_commit_offset_lsn",
371 "LSN value when JNI commit offset is called for SQL Server CDC",
372 &["source_id"],
373 registry,
374 )
375 .unwrap();
376
377 let sink_chunk_buffer_size = register_guarded_int_gauge_vec_with_registry!(
378 "stream_sink_chunk_buffer_size",
379 "Total size of chunks buffered in a barrier",
380 &["sink_id", "actor_id", "fragment_id"],
381 registry
382 )
383 .unwrap();
384 let actor_output_buffer_blocking_duration_ns =
385 register_guarded_int_counter_vec_with_registry!(
386 "stream_actor_output_buffer_blocking_duration_ns",
387 "Total blocking duration (ns) of output buffer",
388 &["actor_id", "fragment_id", "downstream_fragment_id"],
389 registry
390 )
391 .unwrap()
392 .relabel_debug_1(level);
394
395 let actor_input_buffer_blocking_duration_ns =
396 register_guarded_int_counter_vec_with_registry!(
397 "stream_actor_input_buffer_blocking_duration_ns",
398 "Total blocking duration (ns) of input buffer",
399 &["actor_id", "fragment_id", "upstream_fragment_id"],
400 registry
401 )
402 .unwrap()
403 .relabel_debug_1(level);
405
406 let fragment_channel_buffered_bytes = register_guarded_int_gauge_vec_with_registry!(
407 "stream_fragment_channel_buffered_bytes",
408 "Estimated buffered bytes for actor channels by fragment",
409 &["fragment_id"],
410 registry
411 )
412 .unwrap();
413
414 let exchange_frag_recv_size = register_guarded_int_counter_vec_with_registry!(
415 "stream_exchange_frag_recv_size",
416 "Total size of messages that have been received from upstream Fragment",
417 &["up_fragment_id", "down_fragment_id"],
418 registry
419 )
420 .unwrap();
421
422 let actor_poll_duration = register_guarded_int_counter_vec_with_registry!(
423 "stream_actor_poll_duration",
424 "tokio's metrics",
425 &["actor_id", "fragment_id"],
426 registry
427 )
428 .unwrap()
429 .relabel_debug_1(level);
430
431 let actor_poll_cnt = register_guarded_int_counter_vec_with_registry!(
432 "stream_actor_poll_cnt",
433 "tokio's metrics",
434 &["actor_id", "fragment_id"],
435 registry
436 )
437 .unwrap()
438 .relabel_debug_1(level);
439
440 let actor_scheduled_duration = register_guarded_int_counter_vec_with_registry!(
441 "stream_actor_scheduled_duration",
442 "tokio's metrics",
443 &["actor_id", "fragment_id"],
444 registry
445 )
446 .unwrap()
447 .relabel_debug_1(level);
448
449 let actor_scheduled_cnt = register_guarded_int_counter_vec_with_registry!(
450 "stream_actor_scheduled_cnt",
451 "tokio's metrics",
452 &["actor_id", "fragment_id"],
453 registry
454 )
455 .unwrap()
456 .relabel_debug_1(level);
457
458 let actor_idle_duration = register_guarded_int_counter_vec_with_registry!(
459 "stream_actor_idle_duration",
460 "tokio's metrics",
461 &["actor_id", "fragment_id"],
462 registry
463 )
464 .unwrap()
465 .relabel_debug_1(level);
466
467 let actor_idle_cnt = register_guarded_int_counter_vec_with_registry!(
468 "stream_actor_idle_cnt",
469 "tokio's metrics",
470 &["actor_id", "fragment_id"],
471 registry
472 )
473 .unwrap()
474 .relabel_debug_1(level);
475
476 let actor_in_record_cnt = register_guarded_int_counter_vec_with_registry!(
477 "stream_actor_in_record_cnt",
478 "Total number of rows actor received",
479 &["actor_id", "fragment_id", "upstream_fragment_id"],
480 registry
481 )
482 .unwrap()
483 .relabel_debug_1(level);
484
485 let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
486 "stream_actor_out_record_cnt",
487 "Total number of rows actor sent",
488 &["actor_id", "fragment_id"],
489 registry
490 )
491 .unwrap()
492 .relabel_debug_1(level);
493
494 let actor_current_epoch = register_guarded_int_gauge_vec_with_registry!(
495 "stream_actor_current_epoch",
496 "Current epoch of actor",
497 &["actor_id", "fragment_id"],
498 registry
499 )
500 .unwrap()
501 .relabel_debug_1(level);
502
503 let actor_count = register_guarded_int_gauge_vec_with_registry!(
504 "stream_actor_count",
505 "Total number of actors (parallelism)",
506 &["fragment_id"],
507 registry
508 )
509 .unwrap();
510
511 let merge_barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
512 "stream_merge_barrier_align_duration_ns",
513 "Total merge barrier alignment duration (ns)",
514 &["actor_id", "fragment_id"],
515 registry
516 )
517 .unwrap()
518 .relabel_debug_1(level);
519
520 let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
521 "stream_join_lookup_miss_count",
522 "Join executor lookup miss duration",
523 &["side", "join_table_id", "actor_id", "fragment_id"],
524 registry
525 )
526 .unwrap();
527
528 let join_lookup_total_count = register_guarded_int_counter_vec_with_registry!(
529 "stream_join_lookup_total_count",
530 "Join executor lookup total operation",
531 &["side", "join_table_id", "actor_id", "fragment_id"],
532 registry
533 )
534 .unwrap();
535
536 let join_insert_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
537 "stream_join_insert_cache_miss_count",
538 "Join executor cache miss when insert operation",
539 &["side", "join_table_id", "actor_id", "fragment_id"],
540 registry
541 )
542 .unwrap();
543
544 let join_actor_input_waiting_duration_ns = register_guarded_int_counter_vec_with_registry!(
545 "stream_join_actor_input_waiting_duration_ns",
546 "Total waiting duration (ns) of input buffer of join actor",
547 &["actor_id", "fragment_id"],
548 registry
549 )
550 .unwrap();
551
552 let join_match_duration_ns = register_guarded_int_counter_vec_with_registry!(
553 "stream_join_match_duration_ns",
554 "Matching duration for each side",
555 &["actor_id", "fragment_id", "side"],
556 registry
557 )
558 .unwrap();
559
560 let barrier_align_duration = register_guarded_int_counter_vec_with_registry!(
561 "stream_barrier_align_duration_ns",
562 "Duration of join align barrier",
563 &["actor_id", "fragment_id", "wait_side", "executor"],
564 registry
565 )
566 .unwrap()
567 .relabel_debug_1(level);
568
569 let join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
570 "stream_join_cached_entry_count",
571 "Number of cached entries in streaming join operators",
572 &["actor_id", "fragment_id", "side"],
573 registry
574 )
575 .unwrap();
576
577 let join_matched_join_keys_opts = histogram_opts!(
578 "stream_join_matched_join_keys",
579 "The number of keys matched in the opposite side",
580 exponential_buckets(16.0, 2.0, 28).unwrap() );
582
583 let join_matched_join_keys = register_guarded_histogram_vec_with_registry!(
584 join_matched_join_keys_opts,
585 &["actor_id", "fragment_id", "table_id"],
586 registry
587 )
588 .unwrap()
589 .relabel_debug_1(level);
590
591 let agg_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
592 "stream_agg_lookup_miss_count",
593 "Aggregation executor lookup miss duration",
594 &["table_id", "actor_id", "fragment_id"],
595 registry
596 )
597 .unwrap();
598
599 let agg_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
600 "stream_agg_lookup_total_count",
601 "Aggregation executor lookup total operation",
602 &["table_id", "actor_id", "fragment_id"],
603 registry
604 )
605 .unwrap();
606
607 let agg_distinct_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
608 "stream_agg_distinct_cache_miss_count",
609 "Aggregation executor dinsinct miss duration",
610 &["table_id", "actor_id", "fragment_id"],
611 registry
612 )
613 .unwrap();
614
615 let agg_distinct_total_cache_count = register_guarded_int_counter_vec_with_registry!(
616 "stream_agg_distinct_total_cache_count",
617 "Aggregation executor distinct total operation",
618 &["table_id", "actor_id", "fragment_id"],
619 registry
620 )
621 .unwrap();
622
623 let agg_distinct_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
624 "stream_agg_distinct_cached_entry_count",
625 "Total entry counts in distinct aggregation executor cache",
626 &["table_id", "actor_id", "fragment_id"],
627 registry
628 )
629 .unwrap();
630
631 let agg_dirty_groups_count = register_guarded_int_gauge_vec_with_registry!(
632 "stream_agg_dirty_groups_count",
633 "Total dirty group counts in aggregation executor",
634 &["table_id", "actor_id", "fragment_id"],
635 registry
636 )
637 .unwrap();
638
639 let agg_dirty_groups_heap_size = register_guarded_int_gauge_vec_with_registry!(
640 "stream_agg_dirty_groups_heap_size",
641 "Total dirty group heap size in aggregation executor",
642 &["table_id", "actor_id", "fragment_id"],
643 registry
644 )
645 .unwrap();
646
647 let agg_state_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
648 "stream_agg_state_cache_lookup_count",
649 "Aggregation executor state cache lookup count",
650 &["table_id", "actor_id", "fragment_id"],
651 registry
652 )
653 .unwrap();
654
655 let agg_state_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
656 "stream_agg_state_cache_miss_count",
657 "Aggregation executor state cache miss count",
658 &["table_id", "actor_id", "fragment_id"],
659 registry
660 )
661 .unwrap();
662
663 let group_top_n_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
664 "stream_group_top_n_cache_miss_count",
665 "Group top n executor cache miss count",
666 &["table_id", "actor_id", "fragment_id"],
667 registry
668 )
669 .unwrap();
670
671 let group_top_n_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
672 "stream_group_top_n_total_query_cache_count",
673 "Group top n executor query cache total count",
674 &["table_id", "actor_id", "fragment_id"],
675 registry
676 )
677 .unwrap();
678
679 let group_top_n_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
680 "stream_group_top_n_cached_entry_count",
681 "Total entry counts in group top n executor cache",
682 &["table_id", "actor_id", "fragment_id"],
683 registry
684 )
685 .unwrap();
686
687 let group_top_n_appendonly_cache_miss_count =
688 register_guarded_int_counter_vec_with_registry!(
689 "stream_group_top_n_appendonly_cache_miss_count",
690 "Group top n appendonly executor cache miss count",
691 &["table_id", "actor_id", "fragment_id"],
692 registry
693 )
694 .unwrap();
695
696 let group_top_n_appendonly_total_query_cache_count =
697 register_guarded_int_counter_vec_with_registry!(
698 "stream_group_top_n_appendonly_total_query_cache_count",
699 "Group top n appendonly executor total cache count",
700 &["table_id", "actor_id", "fragment_id"],
701 registry
702 )
703 .unwrap();
704
705 let group_top_n_appendonly_cached_entry_count =
706 register_guarded_int_gauge_vec_with_registry!(
707 "stream_group_top_n_appendonly_cached_entry_count",
708 "Total entry counts in group top n appendonly executor cache",
709 &["table_id", "actor_id", "fragment_id"],
710 registry
711 )
712 .unwrap();
713
714 let lookup_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
715 "stream_lookup_cache_miss_count",
716 "Lookup executor cache miss count",
717 &["table_id", "actor_id", "fragment_id"],
718 registry
719 )
720 .unwrap();
721
722 let lookup_total_query_cache_count = register_guarded_int_counter_vec_with_registry!(
723 "stream_lookup_total_query_cache_count",
724 "Lookup executor query cache total count",
725 &["table_id", "actor_id", "fragment_id"],
726 registry
727 )
728 .unwrap();
729
730 let lookup_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
731 "stream_lookup_cached_entry_count",
732 "Total entry counts in lookup executor cache",
733 &["table_id", "actor_id", "fragment_id"],
734 registry
735 )
736 .unwrap();
737
738 let temporal_join_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
739 "stream_temporal_join_cache_miss_count",
740 "Temporal join executor cache miss count",
741 &["table_id", "actor_id", "fragment_id"],
742 registry
743 )
744 .unwrap();
745
746 let temporal_join_total_query_cache_count =
747 register_guarded_int_counter_vec_with_registry!(
748 "stream_temporal_join_total_query_cache_count",
749 "Temporal join executor query cache total count",
750 &["table_id", "actor_id", "fragment_id"],
751 registry
752 )
753 .unwrap();
754
755 let temporal_join_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
756 "stream_temporal_join_cached_entry_count",
757 "Total entry count in temporal join executor cache",
758 &["table_id", "actor_id", "fragment_id"],
759 registry
760 )
761 .unwrap();
762
763 let agg_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
764 "stream_agg_cached_entry_count",
765 "Number of cached keys in streaming aggregation operators",
766 &["table_id", "actor_id", "fragment_id"],
767 registry
768 )
769 .unwrap();
770
771 let agg_chunk_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
772 "stream_agg_chunk_lookup_miss_count",
773 "Aggregation executor chunk-level lookup miss duration",
774 &["table_id", "actor_id", "fragment_id"],
775 registry
776 )
777 .unwrap();
778
779 let agg_chunk_total_lookup_count = register_guarded_int_counter_vec_with_registry!(
780 "stream_agg_chunk_lookup_total_count",
781 "Aggregation executor chunk-level lookup total operation",
782 &["table_id", "actor_id", "fragment_id"],
783 registry
784 )
785 .unwrap();
786
787 let backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
788 "stream_backfill_snapshot_read_row_count",
789 "Total number of rows that have been read from the backfill snapshot",
790 &["table_id", "actor_id"],
791 registry
792 )
793 .unwrap();
794
795 let backfill_upstream_output_row_count = register_guarded_int_counter_vec_with_registry!(
796 "stream_backfill_upstream_output_row_count",
797 "Total number of rows that have been output from the backfill upstream",
798 &["table_id", "actor_id"],
799 registry
800 )
801 .unwrap();
802
803 let cdc_backfill_snapshot_read_row_count = register_guarded_int_counter_vec_with_registry!(
804 "stream_cdc_backfill_snapshot_read_row_count",
805 "Total number of rows that have been read from the cdc_backfill snapshot",
806 &["table_id", "actor_id"],
807 registry
808 )
809 .unwrap();
810
811 let cdc_backfill_upstream_output_row_count =
812 register_guarded_int_counter_vec_with_registry!(
813 "stream_cdc_backfill_upstream_output_row_count",
814 "Total number of rows that have been output from the cdc_backfill upstream",
815 &["table_id", "actor_id"],
816 registry
817 )
818 .unwrap();
819
820 let snapshot_backfill_consume_row_count = register_guarded_int_counter_vec_with_registry!(
821 "stream_snapshot_backfill_consume_snapshot_row_count",
822 "Total number of rows that have been output from snapshot backfill",
823 &["table_id", "actor_id", "stage"],
824 registry
825 )
826 .unwrap();
827
828 let over_window_cached_entry_count = register_guarded_int_gauge_vec_with_registry!(
829 "stream_over_window_cached_entry_count",
830 "Total entry (partition) count in over window executor cache",
831 &["table_id", "actor_id", "fragment_id"],
832 registry
833 )
834 .unwrap();
835
836 let over_window_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
837 "stream_over_window_cache_lookup_count",
838 "Over window executor cache lookup count",
839 &["table_id", "actor_id", "fragment_id"],
840 registry
841 )
842 .unwrap();
843
844 let over_window_cache_miss_count = register_guarded_int_counter_vec_with_registry!(
845 "stream_over_window_cache_miss_count",
846 "Over window executor cache miss count",
847 &["table_id", "actor_id", "fragment_id"],
848 registry
849 )
850 .unwrap();
851
852 let over_window_range_cache_entry_count = register_guarded_int_gauge_vec_with_registry!(
853 "stream_over_window_range_cache_entry_count",
854 "Over window partition range cache entry count",
855 &["table_id", "actor_id", "fragment_id"],
856 registry,
857 )
858 .unwrap();
859
860 let over_window_range_cache_lookup_count = register_guarded_int_counter_vec_with_registry!(
861 "stream_over_window_range_cache_lookup_count",
862 "Over window partition range cache lookup count",
863 &["table_id", "actor_id", "fragment_id"],
864 registry
865 )
866 .unwrap();
867
868 let over_window_range_cache_left_miss_count =
869 register_guarded_int_counter_vec_with_registry!(
870 "stream_over_window_range_cache_left_miss_count",
871 "Over window partition range cache left miss count",
872 &["table_id", "actor_id", "fragment_id"],
873 registry
874 )
875 .unwrap();
876
877 let over_window_range_cache_right_miss_count =
878 register_guarded_int_counter_vec_with_registry!(
879 "stream_over_window_range_cache_right_miss_count",
880 "Over window partition range cache right miss count",
881 &["table_id", "actor_id", "fragment_id"],
882 registry
883 )
884 .unwrap();
885
886 let over_window_accessed_entry_count = register_guarded_int_counter_vec_with_registry!(
887 "stream_over_window_accessed_entry_count",
888 "Over window accessed entry count",
889 &["table_id", "actor_id", "fragment_id"],
890 registry
891 )
892 .unwrap();
893
894 let over_window_compute_count = register_guarded_int_counter_vec_with_registry!(
895 "stream_over_window_compute_count",
896 "Over window compute count",
897 &["table_id", "actor_id", "fragment_id"],
898 registry
899 )
900 .unwrap();
901
902 let over_window_same_output_count = register_guarded_int_counter_vec_with_registry!(
903 "stream_over_window_same_output_count",
904 "Over window same output count",
905 &["table_id", "actor_id", "fragment_id"],
906 registry
907 )
908 .unwrap();
909
910 let opts = histogram_opts!(
911 "stream_barrier_inflight_duration_seconds",
912 "barrier_inflight_latency",
913 exponential_buckets(0.1, 1.5, 16).unwrap() );
915 let barrier_inflight_latency = register_histogram_with_registry!(opts, registry).unwrap();
916
917 let opts = histogram_opts!(
918 "stream_barrier_sync_storage_duration_seconds",
919 "barrier_sync_latency",
920 exponential_buckets(0.1, 1.5, 16).unwrap() );
922 let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();
923
924 let opts = histogram_opts!(
925 "stream_barrier_batch_size",
926 "barrier_batch_size",
927 exponential_buckets(1.0, 2.0, 8).unwrap()
928 );
929 let barrier_batch_size = register_histogram_with_registry!(opts, registry).unwrap();
930
931 let barrier_manager_progress = register_int_counter_with_registry!(
932 "stream_barrier_manager_progress",
933 "The number of actors that have processed the earliest in-flight barriers",
934 registry
935 )
936 .unwrap();
937
938 let sync_kv_log_store_wait_next_poll_ns = register_guarded_int_counter_vec_with_registry!(
939 "sync_kv_log_store_wait_next_poll_ns",
940 "Total duration (ns) of waiting for next poll",
941 &["actor_id", "target", "fragment_id", "relation"],
942 registry
943 )
944 .unwrap();
945
946 let sync_kv_log_store_read_count = register_guarded_int_counter_vec_with_registry!(
947 "sync_kv_log_store_read_count",
948 "read row count throughput of sync_kv log store",
949 &["type", "actor_id", "target", "fragment_id", "relation"],
950 registry
951 )
952 .unwrap();
953
954 let sync_kv_log_store_read_size = register_guarded_int_counter_vec_with_registry!(
955 "sync_kv_log_store_read_size",
956 "read size throughput of sync_kv log store",
957 &["type", "actor_id", "target", "fragment_id", "relation"],
958 registry
959 )
960 .unwrap();
961
962 let sync_kv_log_store_write_pause_duration_ns =
963 register_guarded_int_counter_vec_with_registry!(
964 "sync_kv_log_store_write_pause_duration_ns",
965 "Duration (ns) of sync_kv log store write pause",
966 &["actor_id", "target", "fragment_id", "relation"],
967 registry
968 )
969 .unwrap();
970
971 let sync_kv_log_store_state = register_guarded_int_counter_vec_with_registry!(
972 "sync_kv_log_store_state",
973 "clean/unclean state transition for sync_kv log store",
974 &["state", "actor_id", "target", "fragment_id", "relation"],
975 registry
976 )
977 .unwrap();
978
979 let sync_kv_log_store_storage_write_count =
980 register_guarded_int_counter_vec_with_registry!(
981 "sync_kv_log_store_storage_write_count",
982 "Write row count throughput of sync_kv log store",
983 &["actor_id", "target", "fragment_id", "relation"],
984 registry
985 )
986 .unwrap();
987
988 let sync_kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
989 "sync_kv_log_store_storage_write_size",
990 "Write size throughput of sync_kv log store",
991 &["actor_id", "target", "fragment_id", "relation"],
992 registry
993 )
994 .unwrap();
995
996 let sync_kv_log_store_buffer_unconsumed_item_count =
997 register_guarded_int_gauge_vec_with_registry!(
998 "sync_kv_log_store_buffer_unconsumed_item_count",
999 "Number of Unconsumed Item in buffer",
1000 &["actor_id", "target", "fragment_id", "relation"],
1001 registry
1002 )
1003 .unwrap();
1004
1005 let sync_kv_log_store_buffer_unconsumed_row_count =
1006 register_guarded_int_gauge_vec_with_registry!(
1007 "sync_kv_log_store_buffer_unconsumed_row_count",
1008 "Number of Unconsumed Row in buffer",
1009 &["actor_id", "target", "fragment_id", "relation"],
1010 registry
1011 )
1012 .unwrap();
1013
1014 let sync_kv_log_store_buffer_unconsumed_epoch_count =
1015 register_guarded_int_gauge_vec_with_registry!(
1016 "sync_kv_log_store_buffer_unconsumed_epoch_count",
1017 "Number of Unconsumed Epoch in buffer",
1018 &["actor_id", "target", "fragment_id", "relation"],
1019 registry
1020 )
1021 .unwrap();
1022
1023 let sync_kv_log_store_buffer_unconsumed_min_epoch =
1024 register_guarded_int_gauge_vec_with_registry!(
1025 "sync_kv_log_store_buffer_unconsumed_min_epoch",
1026 "Number of Unconsumed Epoch in buffer",
1027 &["actor_id", "target", "fragment_id", "relation"],
1028 registry
1029 )
1030 .unwrap();
1031 let sync_kv_log_store_buffer_memory_bytes =
1032 register_guarded_int_gauge_vec_with_registry!(
1033 "sync_kv_log_store_buffer_memory_bytes",
1034 "Estimated heap bytes used by synced kv log store buffer (unconsumed + consumed but not truncated)",
1035 &["actor_id", "target", "fragment_id", "relation"],
1036 registry
1037 )
1038 .unwrap();
1039
1040 let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!(
1041 "kv_log_store_storage_write_count",
1042 "Write row count throughput of kv log store",
1043 &["actor_id", "connector", "sink_id", "sink_name"],
1044 registry
1045 )
1046 .unwrap();
1047
1048 let kv_log_store_storage_write_size = register_guarded_int_counter_vec_with_registry!(
1049 "kv_log_store_storage_write_size",
1050 "Write size throughput of kv log store",
1051 &["actor_id", "connector", "sink_id", "sink_name"],
1052 registry
1053 )
1054 .unwrap();
1055
1056 let kv_log_store_storage_read_count = register_guarded_int_counter_vec_with_registry!(
1057 "kv_log_store_storage_read_count",
1058 "Write row count throughput of kv log store",
1059 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1060 registry
1061 )
1062 .unwrap();
1063
1064 let kv_log_store_storage_read_size = register_guarded_int_counter_vec_with_registry!(
1065 "kv_log_store_storage_read_size",
1066 "Write size throughput of kv log store",
1067 &["actor_id", "connector", "sink_id", "sink_name", "read_type"],
1068 registry
1069 )
1070 .unwrap();
1071
1072 let kv_log_store_rewind_count = register_guarded_int_counter_vec_with_registry!(
1073 "kv_log_store_rewind_count",
1074 "Kv log store rewind rate",
1075 &["actor_id", "connector", "sink_id", "sink_name"],
1076 registry
1077 )
1078 .unwrap();
1079
1080 let kv_log_store_rewind_delay_opts = {
1081 assert_eq!(2, REWIND_BACKOFF_FACTOR);
1082 let bucket_count = (REWIND_MAX_DELAY.as_secs_f64().log2()
1083 - REWIND_BASE_DELAY.as_secs_f64().log2())
1084 .ceil() as usize;
1085 let buckets = exponential_buckets(
1086 REWIND_BASE_DELAY.as_secs_f64(),
1087 REWIND_BACKOFF_FACTOR as _,
1088 bucket_count,
1089 )
1090 .unwrap();
1091 histogram_opts!(
1092 "kv_log_store_rewind_delay",
1093 "Kv log store rewind delay",
1094 buckets,
1095 )
1096 };
1097
1098 let kv_log_store_rewind_delay = register_guarded_histogram_vec_with_registry!(
1099 kv_log_store_rewind_delay_opts,
1100 &["actor_id", "connector", "sink_id", "sink_name"],
1101 registry
1102 )
1103 .unwrap();
1104
1105 let kv_log_store_buffer_unconsumed_item_count =
1106 register_guarded_int_gauge_vec_with_registry!(
1107 "kv_log_store_buffer_unconsumed_item_count",
1108 "Number of Unconsumed Item in buffer",
1109 &["actor_id", "connector", "sink_id", "sink_name"],
1110 registry
1111 )
1112 .unwrap();
1113
1114 let kv_log_store_buffer_unconsumed_row_count =
1115 register_guarded_int_gauge_vec_with_registry!(
1116 "kv_log_store_buffer_unconsumed_row_count",
1117 "Number of Unconsumed Row in buffer",
1118 &["actor_id", "connector", "sink_id", "sink_name"],
1119 registry
1120 )
1121 .unwrap();
1122
1123 let kv_log_store_buffer_unconsumed_epoch_count =
1124 register_guarded_int_gauge_vec_with_registry!(
1125 "kv_log_store_buffer_unconsumed_epoch_count",
1126 "Number of Unconsumed Epoch in buffer",
1127 &["actor_id", "connector", "sink_id", "sink_name"],
1128 registry
1129 )
1130 .unwrap();
1131
1132 let kv_log_store_buffer_unconsumed_min_epoch =
1133 register_guarded_int_gauge_vec_with_registry!(
1134 "kv_log_store_buffer_unconsumed_min_epoch",
1135 "Number of Unconsumed Epoch in buffer",
1136 &["actor_id", "connector", "sink_id", "sink_name"],
1137 registry
1138 )
1139 .unwrap();
1140
1141 let crossdb_last_consumed_min_epoch = register_guarded_int_gauge_vec_with_registry!(
1142 "crossdb_last_consumed_min_epoch",
1143 "Last consumed min epoch for cross-database changelog stream scan",
1144 &["table_id", "actor_id", "fragment_id"],
1145 registry
1146 )
1147 .unwrap();
1148
1149 let kv_log_store_buffer_memory_bytes =
1150 register_guarded_int_gauge_vec_with_registry!(
1151 "kv_log_store_buffer_memory_bytes",
1152 "Estimated heap bytes used by kv log store buffer (unconsumed + consumed but not truncated)",
1153 &["actor_id", "connector", "sink_id", "sink_name"],
1154 registry
1155 )
1156 .unwrap();
1157
1158 let lru_runtime_loop_count = register_int_counter_with_registry!(
1159 "lru_runtime_loop_count",
1160 "The counts of the eviction loop in LRU manager per second",
1161 registry
1162 )
1163 .unwrap();
1164
1165 let lru_latest_sequence = register_int_gauge_with_registry!(
1166 "lru_latest_sequence",
1167 "Current LRU global sequence",
1168 registry,
1169 )
1170 .unwrap();
1171
1172 let lru_watermark_sequence = register_int_gauge_with_registry!(
1173 "lru_watermark_sequence",
1174 "Current LRU watermark sequence",
1175 registry,
1176 )
1177 .unwrap();
1178
1179 let lru_eviction_policy = register_int_gauge_with_registry!(
1180 "lru_eviction_policy",
1181 "Current LRU eviction policy",
1182 registry,
1183 )
1184 .unwrap();
1185
1186 let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
1187 "jemalloc_allocated_bytes",
1188 "The allocated memory jemalloc, got from jemalloc_ctl",
1189 registry
1190 )
1191 .unwrap();
1192
1193 let jemalloc_active_bytes = register_int_gauge_with_registry!(
1194 "jemalloc_active_bytes",
1195 "The active memory jemalloc, got from jemalloc_ctl",
1196 registry
1197 )
1198 .unwrap();
1199
1200 let jemalloc_resident_bytes = register_int_gauge_with_registry!(
1201 "jemalloc_resident_bytes",
1202 "The active memory jemalloc, got from jemalloc_ctl",
1203 registry
1204 )
1205 .unwrap();
1206
1207 let jemalloc_metadata_bytes = register_int_gauge_with_registry!(
1208 "jemalloc_metadata_bytes",
1209 "The active memory jemalloc, got from jemalloc_ctl",
1210 registry
1211 )
1212 .unwrap();
1213
1214 let jvm_allocated_bytes = register_int_gauge_with_registry!(
1215 "jvm_allocated_bytes",
1216 "The allocated jvm memory",
1217 registry
1218 )
1219 .unwrap();
1220
1221 let jvm_active_bytes = register_int_gauge_with_registry!(
1222 "jvm_active_bytes",
1223 "The active jvm memory",
1224 registry
1225 )
1226 .unwrap();
1227
1228 let materialize_cache_hit_count = register_guarded_int_counter_vec_with_registry!(
1229 "stream_materialize_cache_hit_count",
1230 "Materialize executor cache hit count",
1231 &["actor_id", "table_id", "fragment_id"],
1232 registry
1233 )
1234 .unwrap()
1235 .relabel_debug_1(level);
1236
1237 let materialize_data_exist_count = register_guarded_int_counter_vec_with_registry!(
1238 "stream_materialize_data_exist_count",
1239 "Materialize executor data exist count",
1240 &["actor_id", "table_id", "fragment_id"],
1241 registry
1242 )
1243 .unwrap()
1244 .relabel_debug_1(level);
1245
1246 let materialize_cache_total_count = register_guarded_int_counter_vec_with_registry!(
1247 "stream_materialize_cache_total_count",
1248 "Materialize executor cache total operation",
1249 &["actor_id", "table_id", "fragment_id"],
1250 registry
1251 )
1252 .unwrap()
1253 .relabel_debug_1(level);
1254
1255 let stream_memory_usage = register_guarded_int_gauge_vec_with_registry!(
1256 "stream_memory_usage",
1257 "Memory usage for stream executors",
1258 &["actor_id", "table_id", "desc"],
1259 registry
1260 )
1261 .unwrap()
1262 .relabel_debug_1(level);
1263
1264 let gap_fill_generated_rows_count = register_guarded_int_counter_vec_with_registry!(
1265 "gap_fill_generated_rows_count",
1266 "Total number of rows generated by gap fill executor",
1267 &["actor_id", "fragment_id"],
1268 registry
1269 )
1270 .unwrap()
1271 .relabel_debug_1(level);
1272
1273 let state_table_iter_count = register_guarded_int_counter_vec_with_registry!(
1274 "state_table_iter_count",
1275 "Total number of state table iter operations",
1276 &["actor_id", "fragment_id", "table_id"],
1277 registry
1278 )
1279 .unwrap()
1280 .relabel_debug_1(level);
1281
1282 let state_table_get_count = register_guarded_int_counter_vec_with_registry!(
1283 "state_table_get_count",
1284 "Total number of state table get operations",
1285 &["actor_id", "fragment_id", "table_id"],
1286 registry
1287 )
1288 .unwrap()
1289 .relabel_debug_1(level);
1290
1291 let state_table_iter_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1292 "state_table_iter_vnode_pruned_count",
1293 "Total number of state table iter operations pruned by vnode statistics",
1294 &["actor_id", "fragment_id", "table_id"],
1295 registry
1296 )
1297 .unwrap()
1298 .relabel_debug_1(level);
1299
1300 let state_table_get_vnode_pruned_count = register_guarded_int_counter_vec_with_registry!(
1301 "state_table_get_vnode_pruned_count",
1302 "Total number of state table get operations pruned by vnode statistics",
1303 &["actor_id", "fragment_id", "table_id"],
1304 registry
1305 )
1306 .unwrap()
1307 .relabel_debug_1(level);
1308
1309 Self {
1310 level,
1311 executor_row_count,
1312 mem_stream_node_output_row_count: stream_node_output_row_count,
1313 mem_stream_node_output_blocking_duration_ns: stream_node_output_blocking_duration_ns,
1314 actor_scheduled_duration,
1315 actor_scheduled_cnt,
1316 actor_poll_duration,
1317 actor_poll_cnt,
1318 actor_idle_duration,
1319 actor_idle_cnt,
1320 actor_count,
1321 actor_in_record_cnt,
1322 actor_out_record_cnt,
1323 fragment_channel_buffered_bytes,
1324 actor_current_epoch,
1325 source_output_row_count,
1326 source_split_change_count,
1327 source_backfill_row_count,
1328 sink_input_row_count,
1329 sink_input_bytes,
1330 sink_chunk_buffer_size,
1331 exchange_frag_recv_size,
1332 merge_barrier_align_duration,
1333 actor_output_buffer_blocking_duration_ns,
1334 actor_input_buffer_blocking_duration_ns,
1335 join_lookup_miss_count,
1336 join_lookup_total_count,
1337 join_insert_cache_miss_count,
1338 join_actor_input_waiting_duration_ns,
1339 join_match_duration_ns,
1340 join_cached_entry_count,
1341 join_matched_join_keys,
1342 barrier_align_duration,
1343 agg_lookup_miss_count,
1344 agg_total_lookup_count,
1345 agg_cached_entry_count,
1346 agg_chunk_lookup_miss_count,
1347 agg_chunk_total_lookup_count,
1348 agg_dirty_groups_count,
1349 agg_dirty_groups_heap_size,
1350 agg_distinct_cache_miss_count,
1351 agg_distinct_total_cache_count,
1352 agg_distinct_cached_entry_count,
1353 agg_state_cache_lookup_count,
1354 agg_state_cache_miss_count,
1355 group_top_n_cache_miss_count,
1356 group_top_n_total_query_cache_count,
1357 group_top_n_cached_entry_count,
1358 group_top_n_appendonly_cache_miss_count,
1359 group_top_n_appendonly_total_query_cache_count,
1360 group_top_n_appendonly_cached_entry_count,
1361 lookup_cache_miss_count,
1362 lookup_total_query_cache_count,
1363 lookup_cached_entry_count,
1364 temporal_join_cache_miss_count,
1365 temporal_join_total_query_cache_count,
1366 temporal_join_cached_entry_count,
1367 backfill_snapshot_read_row_count,
1368 backfill_upstream_output_row_count,
1369 cdc_backfill_snapshot_read_row_count,
1370 cdc_backfill_upstream_output_row_count,
1371 snapshot_backfill_consume_row_count,
1372 over_window_cached_entry_count,
1373 over_window_cache_lookup_count,
1374 over_window_cache_miss_count,
1375 over_window_range_cache_entry_count,
1376 over_window_range_cache_lookup_count,
1377 over_window_range_cache_left_miss_count,
1378 over_window_range_cache_right_miss_count,
1379 over_window_accessed_entry_count,
1380 over_window_compute_count,
1381 over_window_same_output_count,
1382 barrier_inflight_latency,
1383 barrier_sync_latency,
1384 barrier_batch_size,
1385 barrier_manager_progress,
1386 kv_log_store_storage_write_count,
1387 kv_log_store_storage_write_size,
1388 kv_log_store_rewind_count,
1389 kv_log_store_rewind_delay,
1390 kv_log_store_storage_read_count,
1391 kv_log_store_storage_read_size,
1392 kv_log_store_buffer_unconsumed_item_count,
1393 kv_log_store_buffer_unconsumed_row_count,
1394 kv_log_store_buffer_unconsumed_epoch_count,
1395 kv_log_store_buffer_unconsumed_min_epoch,
1396 kv_log_store_buffer_memory_bytes,
1397 crossdb_last_consumed_min_epoch,
1398 sync_kv_log_store_read_count,
1399 sync_kv_log_store_read_size,
1400 sync_kv_log_store_write_pause_duration_ns,
1401 sync_kv_log_store_state,
1402 sync_kv_log_store_wait_next_poll_ns,
1403 sync_kv_log_store_storage_write_count,
1404 sync_kv_log_store_storage_write_size,
1405 sync_kv_log_store_buffer_unconsumed_item_count,
1406 sync_kv_log_store_buffer_unconsumed_row_count,
1407 sync_kv_log_store_buffer_unconsumed_epoch_count,
1408 sync_kv_log_store_buffer_unconsumed_min_epoch,
1409 sync_kv_log_store_buffer_memory_bytes,
1410 lru_runtime_loop_count,
1411 lru_latest_sequence,
1412 lru_watermark_sequence,
1413 lru_eviction_policy,
1414 jemalloc_allocated_bytes,
1415 jemalloc_active_bytes,
1416 jemalloc_resident_bytes,
1417 jemalloc_metadata_bytes,
1418 jvm_allocated_bytes,
1419 jvm_active_bytes,
1420 stream_memory_usage,
1421 materialize_cache_hit_count,
1422 materialize_data_exist_count,
1423 materialize_cache_total_count,
1424 materialize_input_row_count,
1425 materialize_current_epoch,
1426 pg_cdc_state_table_lsn,
1427 pg_cdc_jni_commit_offset_lsn,
1428 mysql_cdc_state_binlog_file_seq,
1429 mysql_cdc_state_binlog_position,
1430 sqlserver_cdc_state_change_lsn,
1431 sqlserver_cdc_state_commit_lsn,
1432 sqlserver_cdc_jni_commit_offset_lsn,
1433 gap_fill_generated_rows_count,
1434 state_table_iter_count,
1435 state_table_get_count,
1436 state_table_iter_vnode_pruned_count,
1437 state_table_get_vnode_pruned_count,
1438 }
1439 }
1440
1441 pub fn unused() -> Self {
1443 global_streaming_metrics(MetricLevel::Disabled)
1444 }
1445
1446 pub fn new_actor_metrics(&self, actor_id: ActorId, fragment_id: FragmentId) -> ActorMetrics {
1447 let label_list: &[&str; 2] = &[&actor_id.to_string(), &fragment_id.to_string()];
1448 let actor_scheduled_duration = self
1449 .actor_scheduled_duration
1450 .with_guarded_label_values(label_list);
1451 let actor_scheduled_cnt = self
1452 .actor_scheduled_cnt
1453 .with_guarded_label_values(label_list);
1454 let actor_poll_duration = self
1455 .actor_poll_duration
1456 .with_guarded_label_values(label_list);
1457 let actor_poll_cnt = self.actor_poll_cnt.with_guarded_label_values(label_list);
1458 let actor_idle_duration = self
1459 .actor_idle_duration
1460 .with_guarded_label_values(label_list);
1461 let actor_idle_cnt = self.actor_idle_cnt.with_guarded_label_values(label_list);
1462 ActorMetrics {
1463 actor_scheduled_duration,
1464 actor_scheduled_cnt,
1465 actor_poll_duration,
1466 actor_poll_cnt,
1467 actor_idle_duration,
1468 actor_idle_cnt,
1469 }
1470 }
1471
1472 pub(crate) fn new_actor_input_metrics(
1473 &self,
1474 actor_id: ActorId,
1475 fragment_id: FragmentId,
1476 upstream_fragment_id: FragmentId,
1477 ) -> ActorInputMetrics {
1478 let actor_id_str = actor_id.to_string();
1479 let fragment_id_str = fragment_id.to_string();
1480 let upstream_fragment_id_str = upstream_fragment_id.to_string();
1481 ActorInputMetrics {
1482 actor_in_record_cnt: self.actor_in_record_cnt.with_guarded_label_values(&[
1483 &actor_id_str,
1484 &fragment_id_str,
1485 &upstream_fragment_id_str,
1486 ]),
1487 actor_input_buffer_blocking_duration_ns: self
1488 .actor_input_buffer_blocking_duration_ns
1489 .with_guarded_label_values(&[
1490 &actor_id_str,
1491 &fragment_id_str,
1492 &upstream_fragment_id_str,
1493 ]),
1494 }
1495 }
1496
1497 pub fn new_sink_exec_metrics(
1498 &self,
1499 id: SinkId,
1500 actor_id: ActorId,
1501 fragment_id: FragmentId,
1502 ) -> SinkExecutorMetrics {
1503 let label_list: &[&str; 3] = &[
1504 &id.to_string(),
1505 &actor_id.to_string(),
1506 &fragment_id.to_string(),
1507 ];
1508 SinkExecutorMetrics {
1509 sink_input_row_count: self
1510 .sink_input_row_count
1511 .with_guarded_label_values(label_list),
1512 sink_input_bytes: self.sink_input_bytes.with_guarded_label_values(label_list),
1513 sink_chunk_buffer_size: self
1514 .sink_chunk_buffer_size
1515 .with_guarded_label_values(label_list),
1516 }
1517 }
1518
1519 pub fn new_group_top_n_metrics(
1520 &self,
1521 table_id: TableId,
1522 actor_id: ActorId,
1523 fragment_id: FragmentId,
1524 ) -> GroupTopNMetrics {
1525 let label_list: &[&str; 3] = &[
1526 &table_id.to_string(),
1527 &actor_id.to_string(),
1528 &fragment_id.to_string(),
1529 ];
1530
1531 GroupTopNMetrics {
1532 group_top_n_cache_miss_count: self
1533 .group_top_n_cache_miss_count
1534 .with_guarded_label_values(label_list),
1535 group_top_n_total_query_cache_count: self
1536 .group_top_n_total_query_cache_count
1537 .with_guarded_label_values(label_list),
1538 group_top_n_cached_entry_count: self
1539 .group_top_n_cached_entry_count
1540 .with_guarded_label_values(label_list),
1541 }
1542 }
1543
1544 pub fn new_append_only_group_top_n_metrics(
1545 &self,
1546 table_id: TableId,
1547 actor_id: ActorId,
1548 fragment_id: FragmentId,
1549 ) -> GroupTopNMetrics {
1550 let label_list: &[&str; 3] = &[
1551 &table_id.to_string(),
1552 &actor_id.to_string(),
1553 &fragment_id.to_string(),
1554 ];
1555
1556 GroupTopNMetrics {
1557 group_top_n_cache_miss_count: self
1558 .group_top_n_appendonly_cache_miss_count
1559 .with_guarded_label_values(label_list),
1560 group_top_n_total_query_cache_count: self
1561 .group_top_n_appendonly_total_query_cache_count
1562 .with_guarded_label_values(label_list),
1563 group_top_n_cached_entry_count: self
1564 .group_top_n_appendonly_cached_entry_count
1565 .with_guarded_label_values(label_list),
1566 }
1567 }
1568
1569 pub fn new_lookup_executor_metrics(
1570 &self,
1571 table_id: TableId,
1572 actor_id: ActorId,
1573 fragment_id: FragmentId,
1574 ) -> LookupExecutorMetrics {
1575 let label_list: &[&str; 3] = &[
1576 &table_id.to_string(),
1577 &actor_id.to_string(),
1578 &fragment_id.to_string(),
1579 ];
1580
1581 LookupExecutorMetrics {
1582 lookup_cache_miss_count: self
1583 .lookup_cache_miss_count
1584 .with_guarded_label_values(label_list),
1585 lookup_total_query_cache_count: self
1586 .lookup_total_query_cache_count
1587 .with_guarded_label_values(label_list),
1588 lookup_cached_entry_count: self
1589 .lookup_cached_entry_count
1590 .with_guarded_label_values(label_list),
1591 }
1592 }
1593
1594 pub fn new_hash_agg_metrics(
1595 &self,
1596 table_id: TableId,
1597 actor_id: ActorId,
1598 fragment_id: FragmentId,
1599 ) -> HashAggMetrics {
1600 let label_list: &[&str; 3] = &[
1601 &table_id.to_string(),
1602 &actor_id.to_string(),
1603 &fragment_id.to_string(),
1604 ];
1605 HashAggMetrics {
1606 agg_lookup_miss_count: self
1607 .agg_lookup_miss_count
1608 .with_guarded_label_values(label_list),
1609 agg_total_lookup_count: self
1610 .agg_total_lookup_count
1611 .with_guarded_label_values(label_list),
1612 agg_cached_entry_count: self
1613 .agg_cached_entry_count
1614 .with_guarded_label_values(label_list),
1615 agg_chunk_lookup_miss_count: self
1616 .agg_chunk_lookup_miss_count
1617 .with_guarded_label_values(label_list),
1618 agg_chunk_total_lookup_count: self
1619 .agg_chunk_total_lookup_count
1620 .with_guarded_label_values(label_list),
1621 agg_dirty_groups_count: self
1622 .agg_dirty_groups_count
1623 .with_guarded_label_values(label_list),
1624 agg_dirty_groups_heap_size: self
1625 .agg_dirty_groups_heap_size
1626 .with_guarded_label_values(label_list),
1627 agg_state_cache_lookup_count: self
1628 .agg_state_cache_lookup_count
1629 .with_guarded_label_values(label_list),
1630 agg_state_cache_miss_count: self
1631 .agg_state_cache_miss_count
1632 .with_guarded_label_values(label_list),
1633 }
1634 }
1635
1636 pub fn new_agg_distinct_dedup_metrics(
1637 &self,
1638 table_id: TableId,
1639 actor_id: ActorId,
1640 fragment_id: FragmentId,
1641 ) -> AggDistinctDedupMetrics {
1642 let label_list: &[&str; 3] = &[
1643 &table_id.to_string(),
1644 &actor_id.to_string(),
1645 &fragment_id.to_string(),
1646 ];
1647 AggDistinctDedupMetrics {
1648 agg_distinct_cache_miss_count: self
1649 .agg_distinct_cache_miss_count
1650 .with_guarded_label_values(label_list),
1651 agg_distinct_total_cache_count: self
1652 .agg_distinct_total_cache_count
1653 .with_guarded_label_values(label_list),
1654 agg_distinct_cached_entry_count: self
1655 .agg_distinct_cached_entry_count
1656 .with_guarded_label_values(label_list),
1657 }
1658 }
1659
1660 pub fn new_temporal_join_metrics(
1661 &self,
1662 table_id: TableId,
1663 actor_id: ActorId,
1664 fragment_id: FragmentId,
1665 ) -> TemporalJoinMetrics {
1666 let label_list: &[&str; 3] = &[
1667 &table_id.to_string(),
1668 &actor_id.to_string(),
1669 &fragment_id.to_string(),
1670 ];
1671 TemporalJoinMetrics {
1672 temporal_join_cache_miss_count: self
1673 .temporal_join_cache_miss_count
1674 .with_guarded_label_values(label_list),
1675 temporal_join_total_query_cache_count: self
1676 .temporal_join_total_query_cache_count
1677 .with_guarded_label_values(label_list),
1678 temporal_join_cached_entry_count: self
1679 .temporal_join_cached_entry_count
1680 .with_guarded_label_values(label_list),
1681 }
1682 }
1683
1684 pub fn new_backfill_metrics(&self, table_id: TableId, actor_id: ActorId) -> BackfillMetrics {
1685 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1686 BackfillMetrics {
1687 backfill_snapshot_read_row_count: self
1688 .backfill_snapshot_read_row_count
1689 .with_guarded_label_values(label_list),
1690 backfill_upstream_output_row_count: self
1691 .backfill_upstream_output_row_count
1692 .with_guarded_label_values(label_list),
1693 }
1694 }
1695
1696 pub fn new_cdc_backfill_metrics(
1697 &self,
1698 table_id: TableId,
1699 actor_id: ActorId,
1700 ) -> CdcBackfillMetrics {
1701 let label_list: &[&str; 2] = &[&table_id.to_string(), &actor_id.to_string()];
1702 CdcBackfillMetrics {
1703 cdc_backfill_snapshot_read_row_count: self
1704 .cdc_backfill_snapshot_read_row_count
1705 .with_guarded_label_values(label_list),
1706 cdc_backfill_upstream_output_row_count: self
1707 .cdc_backfill_upstream_output_row_count
1708 .with_guarded_label_values(label_list),
1709 }
1710 }
1711
1712 pub fn new_over_window_metrics(
1713 &self,
1714 table_id: TableId,
1715 actor_id: ActorId,
1716 fragment_id: FragmentId,
1717 ) -> OverWindowMetrics {
1718 let label_list: &[&str; 3] = &[
1719 &table_id.to_string(),
1720 &actor_id.to_string(),
1721 &fragment_id.to_string(),
1722 ];
1723 OverWindowMetrics {
1724 over_window_cached_entry_count: self
1725 .over_window_cached_entry_count
1726 .with_guarded_label_values(label_list),
1727 over_window_cache_lookup_count: self
1728 .over_window_cache_lookup_count
1729 .with_guarded_label_values(label_list),
1730 over_window_cache_miss_count: self
1731 .over_window_cache_miss_count
1732 .with_guarded_label_values(label_list),
1733 over_window_range_cache_entry_count: self
1734 .over_window_range_cache_entry_count
1735 .with_guarded_label_values(label_list),
1736 over_window_range_cache_lookup_count: self
1737 .over_window_range_cache_lookup_count
1738 .with_guarded_label_values(label_list),
1739 over_window_range_cache_left_miss_count: self
1740 .over_window_range_cache_left_miss_count
1741 .with_guarded_label_values(label_list),
1742 over_window_range_cache_right_miss_count: self
1743 .over_window_range_cache_right_miss_count
1744 .with_guarded_label_values(label_list),
1745 over_window_accessed_entry_count: self
1746 .over_window_accessed_entry_count
1747 .with_guarded_label_values(label_list),
1748 over_window_compute_count: self
1749 .over_window_compute_count
1750 .with_guarded_label_values(label_list),
1751 over_window_same_output_count: self
1752 .over_window_same_output_count
1753 .with_guarded_label_values(label_list),
1754 }
1755 }
1756
1757 pub fn new_materialize_cache_metrics(
1758 &self,
1759 table_id: TableId,
1760 actor_id: ActorId,
1761 fragment_id: FragmentId,
1762 ) -> MaterializeCacheMetrics {
1763 let label_list: &[&str; 3] = &[
1764 &actor_id.to_string(),
1765 &table_id.to_string(),
1766 &fragment_id.to_string(),
1767 ];
1768 MaterializeCacheMetrics {
1769 materialize_cache_hit_count: self
1770 .materialize_cache_hit_count
1771 .with_guarded_label_values(label_list),
1772 materialize_data_exist_count: self
1773 .materialize_data_exist_count
1774 .with_guarded_label_values(label_list),
1775 materialize_cache_total_count: self
1776 .materialize_cache_total_count
1777 .with_guarded_label_values(label_list),
1778 }
1779 }
1780
1781 pub fn new_materialize_metrics(
1782 &self,
1783 table_id: TableId,
1784 actor_id: ActorId,
1785 fragment_id: FragmentId,
1786 ) -> MaterializeMetrics {
1787 let label_list: &[&str; 3] = &[
1788 &actor_id.to_string(),
1789 &table_id.to_string(),
1790 &fragment_id.to_string(),
1791 ];
1792 MaterializeMetrics {
1793 materialize_input_row_count: self
1794 .materialize_input_row_count
1795 .with_guarded_label_values(label_list),
1796 materialize_current_epoch: self
1797 .materialize_current_epoch
1798 .with_guarded_label_values(label_list),
1799 }
1800 }
1801
1802 pub fn new_state_table_metrics(
1803 &self,
1804 table_id: TableId,
1805 actor_id: ActorId,
1806 fragment_id: FragmentId,
1807 ) -> StateTableMetrics {
1808 let label_list: &[&str; 3] = &[
1809 &actor_id.to_string(),
1810 &fragment_id.to_string(),
1811 &table_id.to_string(),
1812 ];
1813 StateTableMetrics {
1814 iter_count: self
1815 .state_table_iter_count
1816 .with_guarded_label_values(label_list),
1817 get_count: self
1818 .state_table_get_count
1819 .with_guarded_label_values(label_list),
1820 iter_vnode_pruned_count: self
1821 .state_table_iter_vnode_pruned_count
1822 .with_guarded_label_values(label_list),
1823 get_vnode_pruned_count: self
1824 .state_table_get_vnode_pruned_count
1825 .with_guarded_label_values(label_list),
1826 }
1827 }
1828}
1829
1830pub(crate) struct ActorInputMetrics {
1831 pub(crate) actor_in_record_cnt: LabelGuardedIntCounter,
1832 pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter,
1833}
1834
1835pub struct ActorMetrics {
1837 pub actor_scheduled_duration: LabelGuardedIntCounter,
1838 pub actor_scheduled_cnt: LabelGuardedIntCounter,
1839 pub actor_poll_duration: LabelGuardedIntCounter,
1840 pub actor_poll_cnt: LabelGuardedIntCounter,
1841 pub actor_idle_duration: LabelGuardedIntCounter,
1842 pub actor_idle_cnt: LabelGuardedIntCounter,
1843}
1844
1845pub struct SinkExecutorMetrics {
1846 pub sink_input_row_count: LabelGuardedIntCounter,
1847 pub sink_input_bytes: LabelGuardedIntCounter,
1848 pub sink_chunk_buffer_size: LabelGuardedIntGauge,
1849}
1850
1851pub struct MaterializeCacheMetrics {
1852 pub materialize_cache_hit_count: LabelGuardedIntCounter,
1853 pub materialize_data_exist_count: LabelGuardedIntCounter,
1854 pub materialize_cache_total_count: LabelGuardedIntCounter,
1855}
1856
1857pub struct MaterializeMetrics {
1858 pub materialize_input_row_count: LabelGuardedIntCounter,
1859 pub materialize_current_epoch: LabelGuardedIntGauge,
1860}
1861
1862pub struct GroupTopNMetrics {
1863 pub group_top_n_cache_miss_count: LabelGuardedIntCounter,
1864 pub group_top_n_total_query_cache_count: LabelGuardedIntCounter,
1865 pub group_top_n_cached_entry_count: LabelGuardedIntGauge,
1866}
1867
1868pub struct LookupExecutorMetrics {
1869 pub lookup_cache_miss_count: LabelGuardedIntCounter,
1870 pub lookup_total_query_cache_count: LabelGuardedIntCounter,
1871 pub lookup_cached_entry_count: LabelGuardedIntGauge,
1872}
1873
1874pub struct HashAggMetrics {
1875 pub agg_lookup_miss_count: LabelGuardedIntCounter,
1876 pub agg_total_lookup_count: LabelGuardedIntCounter,
1877 pub agg_cached_entry_count: LabelGuardedIntGauge,
1878 pub agg_chunk_lookup_miss_count: LabelGuardedIntCounter,
1879 pub agg_chunk_total_lookup_count: LabelGuardedIntCounter,
1880 pub agg_dirty_groups_count: LabelGuardedIntGauge,
1881 pub agg_dirty_groups_heap_size: LabelGuardedIntGauge,
1882 pub agg_state_cache_lookup_count: LabelGuardedIntCounter,
1883 pub agg_state_cache_miss_count: LabelGuardedIntCounter,
1884}
1885
1886pub struct AggDistinctDedupMetrics {
1887 pub agg_distinct_cache_miss_count: LabelGuardedIntCounter,
1888 pub agg_distinct_total_cache_count: LabelGuardedIntCounter,
1889 pub agg_distinct_cached_entry_count: LabelGuardedIntGauge,
1890}
1891
1892pub struct TemporalJoinMetrics {
1893 pub temporal_join_cache_miss_count: LabelGuardedIntCounter,
1894 pub temporal_join_total_query_cache_count: LabelGuardedIntCounter,
1895 pub temporal_join_cached_entry_count: LabelGuardedIntGauge,
1896}
1897
1898pub struct BackfillMetrics {
1899 pub backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1900 pub backfill_upstream_output_row_count: LabelGuardedIntCounter,
1901}
1902
1903#[derive(Clone)]
1904pub struct CdcBackfillMetrics {
1905 pub cdc_backfill_snapshot_read_row_count: LabelGuardedIntCounter,
1906 pub cdc_backfill_upstream_output_row_count: LabelGuardedIntCounter,
1907}
1908
1909pub struct OverWindowMetrics {
1910 pub over_window_cached_entry_count: LabelGuardedIntGauge,
1911 pub over_window_cache_lookup_count: LabelGuardedIntCounter,
1912 pub over_window_cache_miss_count: LabelGuardedIntCounter,
1913 pub over_window_range_cache_entry_count: LabelGuardedIntGauge,
1914 pub over_window_range_cache_lookup_count: LabelGuardedIntCounter,
1915 pub over_window_range_cache_left_miss_count: LabelGuardedIntCounter,
1916 pub over_window_range_cache_right_miss_count: LabelGuardedIntCounter,
1917 pub over_window_accessed_entry_count: LabelGuardedIntCounter,
1918 pub over_window_compute_count: LabelGuardedIntCounter,
1919 pub over_window_same_output_count: LabelGuardedIntCounter,
1920}
1921
1922pub struct StateTableMetrics {
1923 pub iter_count: LabelGuardedIntCounter,
1924 pub get_count: LabelGuardedIntCounter,
1925 pub iter_vnode_pruned_count: LabelGuardedIntCounter,
1926 pub get_vnode_pruned_count: LabelGuardedIntCounter,
1927}