risingwave_hummock_sdk/
change_log.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::sstable_info::SstableInfo;
23
24#[derive(Debug, Clone, PartialEq)]
25pub struct TableChangeLogCommon<T>(
26    // older log at the front
27    VecDeque<EpochNewChangeLogCommon<T>>,
28);
29
30impl<T> TableChangeLogCommon<T> {
31    pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
32        let logs = logs.into_iter().collect::<VecDeque<_>>();
33        debug_assert!(logs.iter().flat_map(|log| log.epochs.iter()).is_sorted());
34        Self(logs)
35    }
36
37    pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
38        self.0.iter()
39    }
40
41    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
42        self.0.iter_mut()
43    }
44
45    pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
46        if let Some(prev_log) = self.0.back() {
47            assert!(
48                prev_log.epochs.last().expect("non-empty")
49                    < new_change_log.epochs.first().expect("non-empty")
50            );
51        }
52        self.0.push_back(new_change_log);
53    }
54
55    pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
56        self.0
57            .iter()
58            .flat_map(|epoch_change_log| epoch_change_log.epochs.iter())
59            .cloned()
60    }
61
62    pub(crate) fn change_log_into_iter(self) -> impl Iterator<Item = EpochNewChangeLogCommon<T>> {
63        self.0.into_iter()
64    }
65
66    pub(crate) fn change_log_iter_mut(
67        &mut self,
68    ) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
69        self.0.iter_mut()
70    }
71}
72
73pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
74
75#[derive(Debug, Clone, PartialEq)]
76pub struct EpochNewChangeLogCommon<T> {
77    pub new_value: Vec<T>,
78    pub old_value: Vec<T>,
79    // epochs are sorted in ascending order
80    pub epochs: Vec<u64>,
81}
82
83pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
84
85impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
86where
87    PbSstableInfo: for<'a> From<&'a T>,
88{
89    fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
90        Self {
91            new_value: val.new_value.iter().map(|a| a.into()).collect(),
92            old_value: val.old_value.iter().map(|a| a.into()).collect(),
93            epochs: val.epochs.clone(),
94        }
95    }
96}
97
98impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
99where
100    T: for<'a> From<&'a PbSstableInfo>,
101{
102    fn from(value: &PbEpochNewChangeLog) -> Self {
103        Self {
104            new_value: value.new_value.iter().map(|a| a.into()).collect(),
105            old_value: value.old_value.iter().map(|a| a.into()).collect(),
106            epochs: value.epochs.clone(),
107        }
108    }
109}
110
111impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
112where
113    PbSstableInfo: From<T>,
114{
115    fn from(val: EpochNewChangeLogCommon<T>) -> Self {
116        Self {
117            new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
118            old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
119            epochs: val.epochs,
120        }
121    }
122}
123
124impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
125where
126    T: From<PbSstableInfo>,
127{
128    fn from(value: PbEpochNewChangeLog) -> Self {
129        Self {
130            new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
131            old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
132            epochs: value.epochs,
133        }
134    }
135}
136
137impl<T> TableChangeLogCommon<T> {
138    pub fn filter_epoch(
139        &self,
140        (min_epoch, max_epoch): (u64, u64),
141    ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
142        let start = self.0.partition_point(|epoch_change_log| {
143            epoch_change_log.epochs.last().expect("non-empty") < &min_epoch
144        });
145        let end = self.0.partition_point(|epoch_change_log| {
146            epoch_change_log.epochs.first().expect("non-empty") <= &max_epoch
147        });
148        self.0.range(start..end)
149    }
150
151    /// Get the `next_epoch` of the given `epoch`
152    /// Return:
153    ///     - Ok(Some(`next_epoch`)): the `next_epoch` of `epoch`
154    ///     - Ok(None): `next_epoch` of `epoch` is not added to change log yet
155    ///     - Err(()): `epoch` is not an existing or to exist one
156    #[expect(clippy::result_unit_err)]
157    pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
158        let start = self.0.partition_point(|epoch_change_log| {
159            epoch_change_log.epochs.last().expect("non-empty") < &epoch
160        });
161        debug_assert!(
162            self.0
163                .range(start..)
164                .flat_map(|epoch_change_log| epoch_change_log.epochs.iter())
165                .is_sorted()
166        );
167        let mut later_epochs = self
168            .0
169            .range(start..)
170            .flat_map(|epoch_change_log| epoch_change_log.epochs.iter())
171            .skip_while(|log_epoch| **log_epoch < epoch);
172        if let Some(first_epoch) = later_epochs.next() {
173            assert!(
174                *first_epoch >= epoch,
175                "first_epoch {} < epoch {}",
176                first_epoch,
177                epoch
178            );
179            if *first_epoch != epoch {
180                return Err(());
181            }
182            if let Some(next_epoch) = later_epochs.next() {
183                assert!(
184                    *next_epoch > epoch,
185                    "next_epoch {} not exceed epoch {}",
186                    next_epoch,
187                    epoch
188                );
189                Ok(Some(*next_epoch))
190            } else {
191                // `epoch` is latest
192                Ok(None)
193            }
194        } else {
195            // all epochs are less than `epoch`
196            Ok(None)
197        }
198    }
199
200    /// Returns epochs where value is non-null and >= `min_epoch`.
201    pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
202        self.filter_epoch((min_epoch, u64::MAX))
203            .filter(|epoch_change_log| {
204                // Filter out empty change logs
205                let new_value_empty = epoch_change_log.new_value.is_empty();
206                let old_value_empty = epoch_change_log.old_value.is_empty();
207                !new_value_empty || !old_value_empty
208            })
209            .flat_map(|epoch_change_log| epoch_change_log.epochs.iter().cloned())
210            .filter(|a| a >= &min_epoch)
211            .take(max_count)
212            .collect()
213    }
214
215    pub fn truncate(&mut self, truncate_epoch: u64) {
216        while let Some(change_log) = self.0.front()
217            && *change_log.epochs.last().expect("non-empty") < truncate_epoch
218        {
219            let _change_log = self.0.pop_front().expect("non-empty");
220        }
221        if let Some(first_log) = self.0.front_mut() {
222            first_log.epochs.retain(|epoch| *epoch >= truncate_epoch);
223        }
224    }
225}
226
227impl<T> TableChangeLogCommon<T>
228where
229    PbSstableInfo: for<'a> From<&'a T>,
230{
231    pub fn to_protobuf(&self) -> PbTableChangeLog {
232        PbTableChangeLog {
233            change_logs: self.0.iter().map(|a| a.into()).collect(),
234        }
235    }
236}
237
238impl<T> TableChangeLogCommon<T>
239where
240    T: for<'a> From<&'a PbSstableInfo>,
241{
242    pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
243        Self(val.change_logs.iter().map(|a| a.into()).collect())
244    }
245}
246
247pub fn build_table_change_log_delta<'a>(
248    old_value_ssts: impl Iterator<Item = SstableInfo>,
249    new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
250    epochs: &Vec<u64>,
251    log_store_table_ids: impl Iterator<Item = (u32, u64)>,
252) -> HashMap<TableId, ChangeLogDelta> {
253    let mut table_change_log: HashMap<_, _> = log_store_table_ids
254        .map(|(table_id, truncate_epoch)| {
255            (
256                TableId::new(table_id),
257                ChangeLogDelta {
258                    truncate_epoch,
259                    new_log: EpochNewChangeLog {
260                        new_value: vec![],
261                        old_value: vec![],
262                        epochs: epochs.clone(),
263                    },
264                },
265            )
266        })
267        .collect();
268    for sst in old_value_ssts {
269        for table_id in &sst.table_ids {
270            match table_change_log.get_mut(&TableId::new(*table_id)) {
271                Some(log) => {
272                    log.new_log.old_value.push(sst.clone());
273                }
274                None => {
275                    warn!(table_id, ?sst, "old value sst contains non-log-store table");
276                }
277            }
278        }
279    }
280    for sst in new_value_ssts {
281        for table_id in &sst.table_ids {
282            if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) {
283                log.new_log.new_value.push(sst.clone());
284            }
285        }
286    }
287    table_change_log
288}
289
290#[derive(Debug, PartialEq, Clone)]
291pub struct ChangeLogDeltaCommon<T> {
292    pub truncate_epoch: u64,
293    pub new_log: EpochNewChangeLogCommon<T>,
294}
295
296pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
297
298impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
299where
300    PbSstableInfo: for<'a> From<&'a T>,
301{
302    fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
303        Self {
304            truncate_epoch: val.truncate_epoch,
305            new_log: Some((&val.new_log).into()),
306        }
307    }
308}
309
310impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
311where
312    T: for<'a> From<&'a PbSstableInfo>,
313{
314    fn from(val: &PbChangeLogDelta) -> Self {
315        Self {
316            truncate_epoch: val.truncate_epoch,
317            new_log: val.new_log.as_ref().unwrap().into(),
318        }
319    }
320}
321
322impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
323where
324    PbSstableInfo: From<T>,
325{
326    fn from(val: ChangeLogDeltaCommon<T>) -> Self {
327        Self {
328            truncate_epoch: val.truncate_epoch,
329            new_log: Some(val.new_log.into()),
330        }
331    }
332}
333
334impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
335where
336    T: From<PbSstableInfo>,
337{
338    fn from(val: PbChangeLogDelta) -> Self {
339        Self {
340            truncate_epoch: val.truncate_epoch,
341            new_log: val.new_log.unwrap().into(),
342        }
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use itertools::Itertools;
349
350    use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
351    use crate::sstable_info::SstableInfo;
352
353    #[test]
354    fn test_filter_epoch() {
355        let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
356            EpochNewChangeLog {
357                new_value: vec![],
358                old_value: vec![],
359                epochs: vec![2],
360            },
361            EpochNewChangeLog {
362                new_value: vec![],
363                old_value: vec![],
364                epochs: vec![3, 4],
365            },
366            EpochNewChangeLog {
367                new_value: vec![],
368                old_value: vec![],
369                epochs: vec![6],
370            },
371            EpochNewChangeLog {
372                new_value: vec![],
373                old_value: vec![],
374                epochs: vec![8, 10],
375            },
376        ]);
377
378        let epochs = (1..=11).collect_vec();
379        for i in 0..epochs.len() {
380            for j in i..epochs.len() {
381                let min_epoch = epochs[i];
382                let max_epoch = epochs[j];
383                let expected = table_change_log
384                    .0
385                    .iter()
386                    .filter(|log| {
387                        &min_epoch <= log.epochs.last().unwrap()
388                            && log.epochs.first().unwrap() <= &max_epoch
389                    })
390                    .cloned()
391                    .collect_vec();
392                let actual = table_change_log
393                    .filter_epoch((min_epoch, max_epoch))
394                    .cloned()
395                    .collect_vec();
396                assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
397            }
398        }
399
400        let existing_epochs = table_change_log.epochs().collect_vec();
401        assert!(existing_epochs.is_sorted());
402        for &epoch in &epochs {
403            let expected = match existing_epochs
404                .iter()
405                .position(|existing_epoch| *existing_epoch >= epoch)
406            {
407                None => {
408                    // all existing epochs are less than epoch
409                    Ok(None)
410                }
411                Some(i) => {
412                    let this_epoch = existing_epochs[i];
413                    assert!(this_epoch >= epoch);
414                    if this_epoch == epoch {
415                        if i + 1 == existing_epochs.len() {
416                            // epoch is the latest epoch
417                            Ok(None)
418                        } else {
419                            Ok(Some(existing_epochs[i + 1]))
420                        }
421                    } else {
422                        // epoch not a existing epoch
423                        Err(())
424                    }
425                }
426            };
427            assert_eq!(expected, table_change_log.next_epoch(epoch));
428        }
429    }
430
431    #[test]
432    fn test_truncate() {
433        let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
434            EpochNewChangeLog {
435                new_value: vec![],
436                old_value: vec![],
437                epochs: vec![1],
438            },
439            EpochNewChangeLog {
440                new_value: vec![],
441                old_value: vec![],
442                epochs: vec![2],
443            },
444            EpochNewChangeLog {
445                new_value: vec![],
446                old_value: vec![],
447                epochs: vec![3, 4],
448            },
449            EpochNewChangeLog {
450                new_value: vec![],
451                old_value: vec![],
452                epochs: vec![5],
453            },
454        ]);
455        let origin_table_change_log = table_change_log.clone();
456        for truncate_epoch in 0..6 {
457            table_change_log.truncate(truncate_epoch);
458            let expected_table_change_log = TableChangeLogCommon(
459                origin_table_change_log
460                    .0
461                    .iter()
462                    .filter_map(|epoch_change_log| {
463                        let mut epoch_change_log = epoch_change_log.clone();
464                        epoch_change_log
465                            .epochs
466                            .retain(|epoch| *epoch >= truncate_epoch);
467                        if epoch_change_log.epochs.is_empty() {
468                            None
469                        } else {
470                            Some(epoch_change_log)
471                        }
472                    })
473                    .collect(),
474            );
475            assert_eq!(expected_table_change_log, table_change_log);
476        }
477    }
478}