risingwave_hummock_sdk/
frontend_version.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21    PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo,
22    StateTableInfoDelta,
23};
24
25use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, resolve_pb_log_epochs};
26use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
27use crate::{HummockVersionId, INVALID_VERSION_ID};
28
29#[derive(Clone, Debug)]
30pub struct FrontendHummockVersion {
31    pub id: HummockVersionId,
32    pub state_table_info: HummockVersionStateTableInfo,
33}
34
35impl FrontendHummockVersion {
36    pub fn from_version(version: &HummockVersion) -> Self {
37        Self {
38            id: version.id,
39            state_table_info: version.state_table_info.clone(),
40        }
41    }
42
43    pub fn to_protobuf(&self) -> PbHummockVersion {
44        #[expect(deprecated)]
45        PbHummockVersion {
46            id: self.id,
47            levels: Default::default(),
48            max_committed_epoch: INVALID_EPOCH,
49            table_watermarks: Default::default(),
50            table_change_logs: Default::default(),
51            state_table_info: self.state_table_info.info().clone(),
52            vector_indexes: Default::default(),
53        }
54    }
55
56    pub fn from_protobuf(value: PbHummockVersion) -> Self {
57        Self {
58            id: value.id,
59            state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
60        }
61    }
62
63    pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
64        if self.id != INVALID_VERSION_ID {
65            assert_eq!(self.id, delta.prev_id);
66        }
67        self.id = delta.id;
68        self.state_table_info
69            .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
70    }
71}
72
73pub struct FrontendHummockVersionDelta {
74    pub prev_id: HummockVersionId,
75    pub id: HummockVersionId,
76    pub removed_table_id: HashSet<TableId>,
77    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
78    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
79}
80
81impl FrontendHummockVersionDelta {
82    pub fn from_delta(delta: &HummockVersionDelta) -> Self {
83        Self {
84            prev_id: delta.prev_id,
85            id: delta.id,
86            removed_table_id: delta.removed_table_ids.clone(),
87            state_table_info_delta: delta.state_table_info_delta.clone(),
88            change_log_delta: delta
89                .change_log_delta
90                .iter()
91                .map(|(table_id, change_log_delta)| {
92                    (
93                        *table_id,
94                        ChangeLogDeltaCommon {
95                            truncate_epoch: change_log_delta.truncate_epoch,
96                            new_log: EpochNewChangeLogCommon {
97                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
98                                new_value: vec![(); change_log_delta.new_log.new_value.len()],
99                                old_value: vec![(); change_log_delta.new_log.old_value.len()],
100                                non_checkpoint_epochs: change_log_delta
101                                    .new_log
102                                    .non_checkpoint_epochs
103                                    .clone(),
104                                checkpoint_epoch: change_log_delta.new_log.checkpoint_epoch,
105                            },
106                        },
107                    )
108                })
109                .collect(),
110        }
111    }
112
113    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
114        #[expect(deprecated)]
115        PbHummockVersionDelta {
116            id: self.id,
117            prev_id: self.prev_id,
118            group_deltas: Default::default(),
119            max_committed_epoch: INVALID_EPOCH,
120            trivial_move: false,
121            new_table_watermarks: Default::default(),
122            removed_table_ids: self.removed_table_id.iter().copied().collect(),
123            change_log_delta: self
124                .change_log_delta
125                .iter()
126                .map(|(table_id, delta)| {
127                    (
128                        *table_id,
129                        PbChangeLogDelta {
130                            new_log: Some(PbEpochNewChangeLog {
131                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()`
132                                old_value: vec![
133                                    PbSstableInfo::default();
134                                    delta.new_log.old_value.len()
135                                ],
136                                new_value: vec![
137                                    PbSstableInfo::default();
138                                    delta.new_log.new_value.len()
139                                ],
140                                epochs: delta.new_log.epochs().collect(),
141                            }),
142                            truncate_epoch: delta.truncate_epoch,
143                        },
144                    )
145                })
146                .collect(),
147            state_table_info_delta: self.state_table_info_delta.clone(),
148            vector_index_delta: Default::default(),
149        }
150    }
151
152    pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
153        Self {
154            prev_id: delta.prev_id,
155            id: delta.id,
156            removed_table_id: delta.removed_table_ids.into_iter().collect(),
157            state_table_info_delta: delta
158                .state_table_info_delta
159                .iter()
160                .map(|(table_id, delta)| ((*table_id), *delta))
161                .collect(),
162            change_log_delta: delta
163                .change_log_delta
164                .iter()
165                .map(|(table_id, change_log_delta)| {
166                    (
167                        *table_id,
168                        ChangeLogDeltaCommon {
169                            truncate_epoch: change_log_delta.truncate_epoch,
170                            new_log: change_log_delta
171                                .new_log
172                                .as_ref()
173                                .map(|new_log| {
174                                    let (non_checkpoint_epochs, checkpoint_epoch) =
175                                        resolve_pb_log_epochs(&new_log.epochs);
176                                    EpochNewChangeLogCommon {
177                                        // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
178                                        new_value: vec![(); new_log.new_value.len()],
179                                        old_value: vec![(); new_log.old_value.len()],
180                                        non_checkpoint_epochs,
181                                        checkpoint_epoch,
182                                    }
183                                })
184                                .unwrap(),
185                        },
186                    )
187                })
188                .collect(),
189        }
190    }
191}