risingwave_storage/hummock/shared_buffer/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) const TEST_TABLE_ID: TableId = TableId::new(233);
16
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use risingwave_common::metrics::{
21    LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge,
22    LazyLabelGuardedIntCounter, LazyLabelGuardedIntGauge,
23};
24use risingwave_pb::id::{FragmentId, TableId};
25
26use crate::monitor::HummockStateStoreMetrics;
27
28pub mod shared_buffer_batch;
29
30pub(crate) struct TableMemoryMetrics {
31    imm_total_size: LabelGuardedIntGauge,
32    imm_count: LabelGuardedIntGauge,
33    pub write_batch_tuple_counts: LabelGuardedIntCounter,
34    pub write_batch_duration: LabelGuardedHistogram,
35    pub write_batch_size: LabelGuardedHistogram,
36    pub mem_table_spill_counts: LazyLabelGuardedIntCounter,
37    pub old_value_size: LazyLabelGuardedIntGauge,
38}
39
40impl TableMemoryMetrics {
41    pub(super) fn new(
42        metrics: &HummockStateStoreMetrics,
43        table_id: TableId,
44        fragment_id: FragmentId,
45        is_replicated: bool,
46    ) -> Self {
47        let table_id_string = if is_replicated {
48            format!("{} replicated", table_id)
49        } else {
50            table_id.to_string()
51        };
52        let fragment_labels_vec = vec![table_id_string.clone(), fragment_id.to_string()];
53        let fragment_labels = fragment_labels_vec.as_slice();
54        let table_labels_vec = vec![table_id_string];
55        let table_labels = table_labels_vec.as_slice();
56        Self {
57            imm_total_size: metrics
58                .per_table_imm_size
59                .with_guarded_label_values(fragment_labels),
60            imm_count: metrics
61                .per_table_imm_count
62                .with_guarded_label_values(table_labels),
63            write_batch_tuple_counts: metrics
64                .write_batch_tuple_counts
65                .with_guarded_label_values(table_labels),
66            write_batch_duration: metrics
67                .write_batch_duration
68                .with_guarded_label_values(table_labels),
69            write_batch_size: metrics
70                .write_batch_size
71                .with_guarded_label_values(table_labels),
72            mem_table_spill_counts: metrics
73                .mem_table_spill_counts
74                .lazy_guarded_metrics(table_labels_vec.clone()),
75            old_value_size: metrics
76                .old_value_size
77                .lazy_guarded_metrics(table_labels_vec),
78        }
79    }
80
81    pub(super) fn for_test() -> Arc<Self> {
82        Self::new(
83            &HummockStateStoreMetrics::unused(),
84            TEST_TABLE_ID,
85            FragmentId::default(),
86            false,
87        )
88        .into()
89    }
90
91    pub(super) fn inc_imm(&self, imm_size: usize) {
92        self.imm_total_size.add(imm_size as _);
93        self.imm_count.inc();
94    }
95
96    pub(super) fn dec_imm(&self, imm_size: usize) {
97        self.imm_total_size.sub(imm_size as _);
98        self.imm_count.dec();
99    }
100}
101
102impl Debug for TableMemoryMetrics {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("TableMemoryMetrics").finish()
105    }
106}