risingwave_hummock_sdk/
frontend_version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21    PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog,
22    StateTableInfoDelta,
23};
24
25use crate::change_log::{
26    ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon, resolve_pb_log_epochs,
27};
28use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
29use crate::{HummockVersionId, INVALID_VERSION_ID};
30
31#[derive(Clone, Debug)]
32pub struct FrontendHummockVersion {
33    pub id: HummockVersionId,
34    pub state_table_info: HummockVersionStateTableInfo,
35    pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
36}
37
38impl FrontendHummockVersion {
39    pub fn from_version(version: &HummockVersion) -> Self {
40        Self {
41            id: version.id,
42            state_table_info: version.state_table_info.clone(),
43            table_change_log: version
44                .table_change_log
45                .iter()
46                .map(|(table_id, change_log)| {
47                    (
48                        *table_id,
49                        TableChangeLogCommon::new(change_log.iter().map(|change_log| {
50                            EpochNewChangeLogCommon {
51                                new_value: vec![(); change_log.new_value.len()],
52                                old_value: vec![(); change_log.new_value.len()],
53                                non_checkpoint_epochs: change_log.non_checkpoint_epochs.clone(),
54                                checkpoint_epoch: change_log.checkpoint_epoch,
55                            }
56                        })),
57                    )
58                })
59                .collect(),
60        }
61    }
62
63    pub fn to_protobuf(&self) -> PbHummockVersion {
64        #[expect(deprecated)]
65        PbHummockVersion {
66            id: self.id.0,
67            levels: Default::default(),
68            max_committed_epoch: INVALID_EPOCH,
69            table_watermarks: Default::default(),
70            table_change_logs: self
71                .table_change_log
72                .iter()
73                .map(|(table_id, change_log)| {
74                    (
75                        table_id.table_id,
76                        PbTableChangeLog {
77                            change_logs: change_log
78                                .iter()
79                                .map(|change_log| PbEpochNewChangeLog {
80                                    old_value: vec![
81                                        PbSstableInfo::default();
82                                        change_log.old_value.len()
83                                    ],
84                                    new_value: vec![
85                                        PbSstableInfo::default();
86                                        change_log.new_value.len()
87                                    ],
88                                    epochs: change_log.epochs().collect(),
89                                })
90                                .collect(),
91                        },
92                    )
93                })
94                .collect(),
95            state_table_info: self.state_table_info.to_protobuf(),
96        }
97    }
98
99    pub fn from_protobuf(value: PbHummockVersion) -> Self {
100        Self {
101            id: HummockVersionId(value.id),
102            state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
103            table_change_log: value
104                .table_change_logs
105                .into_iter()
106                .map(|(table_id, change_log)| {
107                    (
108                        TableId::new(table_id),
109                        TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
110                            |change_log| {
111                                let (non_checkpoint_epochs, checkpoint_epoch) =
112                                    resolve_pb_log_epochs(&change_log.epochs);
113                                EpochNewChangeLogCommon {
114                                    // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
115                                    new_value: vec![(); change_log.new_value.len()],
116                                    old_value: vec![(); change_log.old_value.len()],
117                                    non_checkpoint_epochs,
118                                    checkpoint_epoch,
119                                }
120                            },
121                        )),
122                    )
123                })
124                .collect(),
125        }
126    }
127
128    pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
129        if self.id != INVALID_VERSION_ID {
130            assert_eq!(self.id, delta.prev_id);
131        }
132        self.id = delta.id;
133        let (changed_table_info, _) = self
134            .state_table_info
135            .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
136        HummockVersion::apply_change_log_delta(
137            &mut self.table_change_log,
138            &delta.change_log_delta,
139            &delta.removed_table_id,
140            &delta.state_table_info_delta,
141            &changed_table_info,
142        );
143    }
144}
145
146pub struct FrontendHummockVersionDelta {
147    pub prev_id: HummockVersionId,
148    pub id: HummockVersionId,
149    pub removed_table_id: HashSet<TableId>,
150    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
151    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
152}
153
154impl FrontendHummockVersionDelta {
155    pub fn from_delta(delta: &HummockVersionDelta) -> Self {
156        Self {
157            prev_id: delta.prev_id,
158            id: delta.id,
159            removed_table_id: delta.removed_table_ids.clone(),
160            state_table_info_delta: delta.state_table_info_delta.clone(),
161            change_log_delta: delta
162                .change_log_delta
163                .iter()
164                .map(|(table_id, change_log_delta)| {
165                    (
166                        *table_id,
167                        ChangeLogDeltaCommon {
168                            truncate_epoch: change_log_delta.truncate_epoch,
169                            new_log: EpochNewChangeLogCommon {
170                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
171                                new_value: vec![(); change_log_delta.new_log.new_value.len()],
172                                old_value: vec![(); change_log_delta.new_log.old_value.len()],
173                                non_checkpoint_epochs: change_log_delta
174                                    .new_log
175                                    .non_checkpoint_epochs
176                                    .clone(),
177                                checkpoint_epoch: change_log_delta.new_log.checkpoint_epoch,
178                            },
179                        },
180                    )
181                })
182                .collect(),
183        }
184    }
185
186    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
187        #[expect(deprecated)]
188        PbHummockVersionDelta {
189            id: self.id.to_u64(),
190            prev_id: self.prev_id.to_u64(),
191            group_deltas: Default::default(),
192            max_committed_epoch: INVALID_EPOCH,
193            trivial_move: false,
194            new_table_watermarks: Default::default(),
195            removed_table_ids: self
196                .removed_table_id
197                .iter()
198                .map(|table_id| table_id.table_id)
199                .collect(),
200            change_log_delta: self
201                .change_log_delta
202                .iter()
203                .map(|(table_id, delta)| {
204                    (
205                        table_id.table_id,
206                        PbChangeLogDelta {
207                            new_log: Some(PbEpochNewChangeLog {
208                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()`
209                                old_value: vec![
210                                    PbSstableInfo::default();
211                                    delta.new_log.old_value.len()
212                                ],
213                                new_value: vec![
214                                    PbSstableInfo::default();
215                                    delta.new_log.new_value.len()
216                                ],
217                                epochs: delta.new_log.epochs().collect(),
218                            }),
219                            truncate_epoch: delta.truncate_epoch,
220                        },
221                    )
222                })
223                .collect(),
224            state_table_info_delta: self
225                .state_table_info_delta
226                .iter()
227                .map(|(table_id, delta)| (table_id.table_id, *delta))
228                .collect(),
229        }
230    }
231
232    pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
233        Self {
234            prev_id: HummockVersionId::new(delta.prev_id),
235            id: HummockVersionId::new(delta.id),
236            removed_table_id: delta
237                .removed_table_ids
238                .iter()
239                .map(|table_id| TableId::new(*table_id))
240                .collect(),
241            state_table_info_delta: delta
242                .state_table_info_delta
243                .into_iter()
244                .map(|(table_id, delta)| (TableId::new(table_id), delta))
245                .collect(),
246            change_log_delta: delta
247                .change_log_delta
248                .iter()
249                .map(|(table_id, change_log_delta)| {
250                    (
251                        TableId::new(*table_id),
252                        ChangeLogDeltaCommon {
253                            truncate_epoch: change_log_delta.truncate_epoch,
254                            new_log: change_log_delta
255                                .new_log
256                                .as_ref()
257                                .map(|new_log| {
258                                    let (non_checkpoint_epochs, checkpoint_epoch) =
259                                        resolve_pb_log_epochs(&new_log.epochs);
260                                    EpochNewChangeLogCommon {
261                                        // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
262                                        new_value: vec![(); new_log.new_value.len()],
263                                        old_value: vec![(); new_log.old_value.len()],
264                                        non_checkpoint_epochs,
265                                        checkpoint_epoch,
266                                    }
267                                })
268                                .unwrap(),
269                        },
270                    )
271                })
272                .collect(),
273        }
274    }
275}