risingwave_hummock_sdk/
frontend_version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21    PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog,
22    StateTableInfoDelta,
23};
24
25use crate::change_log::{ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon};
26use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
27use crate::{HummockVersionId, INVALID_VERSION_ID};
28
29#[derive(Clone, Debug)]
30pub struct FrontendHummockVersion {
31    pub id: HummockVersionId,
32    pub state_table_info: HummockVersionStateTableInfo,
33    pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
34}
35
36impl FrontendHummockVersion {
37    pub fn from_version(version: &HummockVersion) -> Self {
38        Self {
39            id: version.id,
40            state_table_info: version.state_table_info.clone(),
41            table_change_log: version
42                .table_change_log
43                .iter()
44                .map(|(table_id, change_log)| {
45                    (
46                        *table_id,
47                        TableChangeLogCommon::new(change_log.iter().map(|change_log| {
48                            EpochNewChangeLogCommon {
49                                new_value: vec![(); change_log.new_value.len()],
50                                old_value: vec![(); change_log.new_value.len()],
51                                epochs: change_log.epochs.clone(),
52                            }
53                        })),
54                    )
55                })
56                .collect(),
57        }
58    }
59
60    pub fn to_protobuf(&self) -> PbHummockVersion {
61        #[expect(deprecated)]
62        PbHummockVersion {
63            id: self.id.0,
64            levels: Default::default(),
65            max_committed_epoch: INVALID_EPOCH,
66            table_watermarks: Default::default(),
67            table_change_logs: self
68                .table_change_log
69                .iter()
70                .map(|(table_id, change_log)| {
71                    (
72                        table_id.table_id,
73                        PbTableChangeLog {
74                            change_logs: change_log
75                                .iter()
76                                .map(|change_log| PbEpochNewChangeLog {
77                                    old_value: vec![
78                                        PbSstableInfo::default();
79                                        change_log.old_value.len()
80                                    ],
81                                    new_value: vec![
82                                        PbSstableInfo::default();
83                                        change_log.new_value.len()
84                                    ],
85                                    epochs: change_log.epochs.clone(),
86                                })
87                                .collect(),
88                        },
89                    )
90                })
91                .collect(),
92            state_table_info: self.state_table_info.to_protobuf(),
93        }
94    }
95
96    pub fn from_protobuf(value: PbHummockVersion) -> Self {
97        Self {
98            id: HummockVersionId(value.id),
99            state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
100            table_change_log: value
101                .table_change_logs
102                .into_iter()
103                .map(|(table_id, change_log)| {
104                    (
105                        TableId::new(table_id),
106                        TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
107                            |change_log| EpochNewChangeLogCommon {
108                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
109                                new_value: vec![(); change_log.new_value.len()],
110                                old_value: vec![(); change_log.old_value.len()],
111                                epochs: change_log.epochs,
112                            },
113                        )),
114                    )
115                })
116                .collect(),
117        }
118    }
119
120    pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
121        if self.id != INVALID_VERSION_ID {
122            assert_eq!(self.id, delta.prev_id);
123        }
124        self.id = delta.id;
125        let (changed_table_info, _) = self
126            .state_table_info
127            .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
128        HummockVersion::apply_change_log_delta(
129            &mut self.table_change_log,
130            &delta.change_log_delta,
131            &delta.removed_table_id,
132            &delta.state_table_info_delta,
133            &changed_table_info,
134        );
135    }
136}
137
138pub struct FrontendHummockVersionDelta {
139    pub prev_id: HummockVersionId,
140    pub id: HummockVersionId,
141    pub removed_table_id: HashSet<TableId>,
142    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
143    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
144}
145
146impl FrontendHummockVersionDelta {
147    pub fn from_delta(delta: &HummockVersionDelta) -> Self {
148        Self {
149            prev_id: delta.prev_id,
150            id: delta.id,
151            removed_table_id: delta.removed_table_ids.clone(),
152            state_table_info_delta: delta.state_table_info_delta.clone(),
153            change_log_delta: delta
154                .change_log_delta
155                .iter()
156                .map(|(table_id, change_log_delta)| {
157                    (
158                        *table_id,
159                        ChangeLogDeltaCommon {
160                            truncate_epoch: change_log_delta.truncate_epoch,
161                            new_log: EpochNewChangeLogCommon {
162                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
163                                new_value: vec![(); change_log_delta.new_log.new_value.len()],
164                                old_value: vec![(); change_log_delta.new_log.old_value.len()],
165                                epochs: change_log_delta.new_log.epochs.clone(),
166                            },
167                        },
168                    )
169                })
170                .collect(),
171        }
172    }
173
174    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
175        #[expect(deprecated)]
176        PbHummockVersionDelta {
177            id: self.id.to_u64(),
178            prev_id: self.prev_id.to_u64(),
179            group_deltas: Default::default(),
180            max_committed_epoch: INVALID_EPOCH,
181            trivial_move: false,
182            new_table_watermarks: Default::default(),
183            removed_table_ids: self
184                .removed_table_id
185                .iter()
186                .map(|table_id| table_id.table_id)
187                .collect(),
188            change_log_delta: self
189                .change_log_delta
190                .iter()
191                .map(|(table_id, delta)| {
192                    (
193                        table_id.table_id,
194                        PbChangeLogDelta {
195                            new_log: Some(PbEpochNewChangeLog {
196                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()`
197                                old_value: vec![
198                                    PbSstableInfo::default();
199                                    delta.new_log.old_value.len()
200                                ],
201                                new_value: vec![
202                                    PbSstableInfo::default();
203                                    delta.new_log.new_value.len()
204                                ],
205                                epochs: delta.new_log.epochs.clone(),
206                            }),
207                            truncate_epoch: delta.truncate_epoch,
208                        },
209                    )
210                })
211                .collect(),
212            state_table_info_delta: self
213                .state_table_info_delta
214                .iter()
215                .map(|(table_id, delta)| (table_id.table_id, *delta))
216                .collect(),
217        }
218    }
219
220    pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
221        Self {
222            prev_id: HummockVersionId::new(delta.prev_id),
223            id: HummockVersionId::new(delta.id),
224            removed_table_id: delta
225                .removed_table_ids
226                .iter()
227                .map(|table_id| TableId::new(*table_id))
228                .collect(),
229            state_table_info_delta: delta
230                .state_table_info_delta
231                .into_iter()
232                .map(|(table_id, delta)| (TableId::new(table_id), delta))
233                .collect(),
234            change_log_delta: delta
235                .change_log_delta
236                .iter()
237                .map(|(table_id, change_log_delta)| {
238                    (
239                        TableId::new(*table_id),
240                        ChangeLogDeltaCommon {
241                            truncate_epoch: change_log_delta.truncate_epoch,
242                            new_log: change_log_delta
243                                .new_log
244                                .as_ref()
245                                .map(|new_log| {
246                                    EpochNewChangeLogCommon {
247                                        // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
248                                        new_value: vec![(); new_log.new_value.len()],
249                                        old_value: vec![(); new_log.old_value.len()],
250                                        epochs: new_log.epochs.clone(),
251                                    }
252                                })
253                                .unwrap(),
254                        },
255                    )
256                })
257                .collect(),
258        }
259    }
260}