risingwave_hummock_sdk/
frontend_version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::TableId;
18use risingwave_common::util::epoch::INVALID_EPOCH;
19use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
20use risingwave_pb::hummock::{
21    PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog,
22    StateTableInfoDelta,
23};
24
25use crate::change_log::{
26    ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLogCommon, resolve_pb_log_epochs,
27};
28use crate::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
29use crate::{HummockVersionId, INVALID_VERSION_ID};
30
31#[derive(Clone, Debug)]
32pub struct FrontendHummockVersion {
33    pub id: HummockVersionId,
34    pub state_table_info: HummockVersionStateTableInfo,
35    pub table_change_log: HashMap<TableId, TableChangeLogCommon<()>>,
36}
37
38impl FrontendHummockVersion {
39    pub fn from_version(version: &HummockVersion) -> Self {
40        Self {
41            id: version.id,
42            state_table_info: version.state_table_info.clone(),
43            table_change_log: version
44                .table_change_log
45                .iter()
46                .map(|(table_id, change_log)| {
47                    (
48                        *table_id,
49                        TableChangeLogCommon::new(change_log.iter().map(|change_log| {
50                            EpochNewChangeLogCommon {
51                                new_value: vec![(); change_log.new_value.len()],
52                                old_value: vec![(); change_log.new_value.len()],
53                                non_checkpoint_epochs: change_log.non_checkpoint_epochs.clone(),
54                                checkpoint_epoch: change_log.checkpoint_epoch,
55                            }
56                        })),
57                    )
58                })
59                .collect(),
60        }
61    }
62
63    pub fn to_protobuf(&self) -> PbHummockVersion {
64        #[expect(deprecated)]
65        PbHummockVersion {
66            id: self.id.0,
67            levels: Default::default(),
68            max_committed_epoch: INVALID_EPOCH,
69            table_watermarks: Default::default(),
70            table_change_logs: self
71                .table_change_log
72                .iter()
73                .map(|(table_id, change_log)| {
74                    (
75                        table_id.table_id,
76                        PbTableChangeLog {
77                            change_logs: change_log
78                                .iter()
79                                .map(|change_log| PbEpochNewChangeLog {
80                                    old_value: vec![
81                                        PbSstableInfo::default();
82                                        change_log.old_value.len()
83                                    ],
84                                    new_value: vec![
85                                        PbSstableInfo::default();
86                                        change_log.new_value.len()
87                                    ],
88                                    epochs: change_log.epochs().collect(),
89                                })
90                                .collect(),
91                        },
92                    )
93                })
94                .collect(),
95            state_table_info: self.state_table_info.to_protobuf(),
96            vector_indexes: Default::default(),
97        }
98    }
99
100    pub fn from_protobuf(value: PbHummockVersion) -> Self {
101        Self {
102            id: HummockVersionId(value.id),
103            state_table_info: HummockVersionStateTableInfo::from_protobuf(&value.state_table_info),
104            table_change_log: value
105                .table_change_logs
106                .into_iter()
107                .map(|(table_id, change_log)| {
108                    (
109                        TableId::new(table_id),
110                        TableChangeLogCommon::new(change_log.change_logs.into_iter().map(
111                            |change_log| {
112                                let (non_checkpoint_epochs, checkpoint_epoch) =
113                                    resolve_pb_log_epochs(&change_log.epochs);
114                                EpochNewChangeLogCommon {
115                                    // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
116                                    new_value: vec![(); change_log.new_value.len()],
117                                    old_value: vec![(); change_log.old_value.len()],
118                                    non_checkpoint_epochs,
119                                    checkpoint_epoch,
120                                }
121                            },
122                        )),
123                    )
124                })
125                .collect(),
126        }
127    }
128
129    pub fn apply_delta(&mut self, delta: FrontendHummockVersionDelta) {
130        if self.id != INVALID_VERSION_ID {
131            assert_eq!(self.id, delta.prev_id);
132        }
133        self.id = delta.id;
134        let (changed_table_info, _) = self
135            .state_table_info
136            .apply_delta(&delta.state_table_info_delta, &delta.removed_table_id);
137        HummockVersion::apply_change_log_delta(
138            &mut self.table_change_log,
139            &delta.change_log_delta,
140            &delta.removed_table_id,
141            &delta.state_table_info_delta,
142            &changed_table_info,
143        );
144    }
145}
146
147pub struct FrontendHummockVersionDelta {
148    pub prev_id: HummockVersionId,
149    pub id: HummockVersionId,
150    pub removed_table_id: HashSet<TableId>,
151    pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
152    pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<()>>,
153}
154
155impl FrontendHummockVersionDelta {
156    pub fn from_delta(delta: &HummockVersionDelta) -> Self {
157        Self {
158            prev_id: delta.prev_id,
159            id: delta.id,
160            removed_table_id: delta.removed_table_ids.clone(),
161            state_table_info_delta: delta.state_table_info_delta.clone(),
162            change_log_delta: delta
163                .change_log_delta
164                .iter()
165                .map(|(table_id, change_log_delta)| {
166                    (
167                        *table_id,
168                        ChangeLogDeltaCommon {
169                            truncate_epoch: change_log_delta.truncate_epoch,
170                            new_log: EpochNewChangeLogCommon {
171                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
172                                new_value: vec![(); change_log_delta.new_log.new_value.len()],
173                                old_value: vec![(); change_log_delta.new_log.old_value.len()],
174                                non_checkpoint_epochs: change_log_delta
175                                    .new_log
176                                    .non_checkpoint_epochs
177                                    .clone(),
178                                checkpoint_epoch: change_log_delta.new_log.checkpoint_epoch,
179                            },
180                        },
181                    )
182                })
183                .collect(),
184        }
185    }
186
187    pub fn to_protobuf(&self) -> PbHummockVersionDelta {
188        #[expect(deprecated)]
189        PbHummockVersionDelta {
190            id: self.id.to_u64(),
191            prev_id: self.prev_id.to_u64(),
192            group_deltas: Default::default(),
193            max_committed_epoch: INVALID_EPOCH,
194            trivial_move: false,
195            new_table_watermarks: Default::default(),
196            removed_table_ids: self
197                .removed_table_id
198                .iter()
199                .map(|table_id| table_id.table_id)
200                .collect(),
201            change_log_delta: self
202                .change_log_delta
203                .iter()
204                .map(|(table_id, delta)| {
205                    (
206                        table_id.table_id,
207                        PbChangeLogDelta {
208                            new_log: Some(PbEpochNewChangeLog {
209                                // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()`
210                                old_value: vec![
211                                    PbSstableInfo::default();
212                                    delta.new_log.old_value.len()
213                                ],
214                                new_value: vec![
215                                    PbSstableInfo::default();
216                                    delta.new_log.new_value.len()
217                                ],
218                                epochs: delta.new_log.epochs().collect(),
219                            }),
220                            truncate_epoch: delta.truncate_epoch,
221                        },
222                    )
223                })
224                .collect(),
225            state_table_info_delta: self
226                .state_table_info_delta
227                .iter()
228                .map(|(table_id, delta)| (table_id.table_id, *delta))
229                .collect(),
230            vector_index_delta: Default::default(),
231        }
232    }
233
234    pub fn from_protobuf(delta: PbHummockVersionDelta) -> Self {
235        Self {
236            prev_id: HummockVersionId::new(delta.prev_id),
237            id: HummockVersionId::new(delta.id),
238            removed_table_id: delta
239                .removed_table_ids
240                .iter()
241                .map(|table_id| TableId::new(*table_id))
242                .collect(),
243            state_table_info_delta: delta
244                .state_table_info_delta
245                .into_iter()
246                .map(|(table_id, delta)| (TableId::new(table_id), delta))
247                .collect(),
248            change_log_delta: delta
249                .change_log_delta
250                .iter()
251                .map(|(table_id, change_log_delta)| {
252                    (
253                        TableId::new(*table_id),
254                        ChangeLogDeltaCommon {
255                            truncate_epoch: change_log_delta.truncate_epoch,
256                            new_log: change_log_delta
257                                .new_log
258                                .as_ref()
259                                .map(|new_log| {
260                                    let (non_checkpoint_epochs, checkpoint_epoch) =
261                                        resolve_pb_log_epochs(&new_log.epochs);
262                                    EpochNewChangeLogCommon {
263                                        // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()`
264                                        new_value: vec![(); new_log.new_value.len()],
265                                        old_value: vec![(); new_log.old_value.len()],
266                                        non_checkpoint_epochs,
267                                        checkpoint_epoch,
268                                    }
269                                })
270                                .unwrap(),
271                        },
272                    )
273                })
274                .collect(),
275        }
276    }
277}