risingwave_hummock_sdk/
change_log.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::sstable_info::SstableInfo;
23
24#[derive(Debug, Clone, PartialEq)]
25pub struct TableChangeLogCommon<T>(
26    // older log at the front
27    VecDeque<EpochNewChangeLogCommon<T>>,
28);
29
30impl<T> TableChangeLogCommon<T> {
31    pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
32        let logs = logs.into_iter().collect::<VecDeque<_>>();
33        debug_assert!(logs.iter().flat_map(|log| log.epochs()).is_sorted());
34        Self(logs)
35    }
36
37    pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
38        self.0.iter()
39    }
40
41    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
42        self.0.iter_mut()
43    }
44
45    pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
46        if let Some(prev_log) = self.0.back() {
47            assert!(prev_log.checkpoint_epoch < new_change_log.first_epoch());
48        }
49        self.0.push_back(new_change_log);
50    }
51
52    pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
53        self.0
54            .iter()
55            .flat_map(|epoch_change_log| epoch_change_log.epochs())
56    }
57
58    pub(crate) fn change_log_into_iter(self) -> impl Iterator<Item = EpochNewChangeLogCommon<T>> {
59        self.0.into_iter()
60    }
61
62    pub(crate) fn change_log_iter_mut(
63        &mut self,
64    ) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
65        self.0.iter_mut()
66    }
67}
68
69pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct EpochNewChangeLogCommon<T> {
73    pub new_value: Vec<T>,
74    pub old_value: Vec<T>,
75    // epochs are sorted in ascending order
76    pub non_checkpoint_epochs: Vec<u64>,
77    pub checkpoint_epoch: u64,
78}
79
80pub(crate) fn resolve_pb_log_epochs(epochs: &Vec<u64>) -> (Vec<u64>, u64) {
81    (
82        Vec::from(&epochs[0..(epochs.len() - 1)]),
83        *epochs.last().expect("non-empty"),
84    )
85}
86
87impl<T> EpochNewChangeLogCommon<T> {
88    pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
89        self.non_checkpoint_epochs
90            .iter()
91            .cloned()
92            .chain([self.checkpoint_epoch])
93    }
94
95    pub fn first_epoch(&self) -> u64 {
96        self.non_checkpoint_epochs
97            .first()
98            .cloned()
99            .unwrap_or(self.checkpoint_epoch)
100    }
101}
102
103pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
104
105impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
106where
107    PbSstableInfo: for<'a> From<&'a T>,
108{
109    fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
110        Self {
111            new_value: val.new_value.iter().map(|a| a.into()).collect(),
112            old_value: val.old_value.iter().map(|a| a.into()).collect(),
113            epochs: val.epochs().collect(),
114        }
115    }
116}
117
118impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
119where
120    T: for<'a> From<&'a PbSstableInfo>,
121{
122    fn from(value: &PbEpochNewChangeLog) -> Self {
123        let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
124        Self {
125            new_value: value.new_value.iter().map(|a| a.into()).collect(),
126            old_value: value.old_value.iter().map(|a| a.into()).collect(),
127            non_checkpoint_epochs,
128            checkpoint_epoch,
129        }
130    }
131}
132
133impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
134where
135    PbSstableInfo: From<T>,
136{
137    fn from(val: EpochNewChangeLogCommon<T>) -> Self {
138        Self {
139            epochs: val.epochs().collect(),
140            new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
141            old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
142        }
143    }
144}
145
146impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
147where
148    T: From<PbSstableInfo>,
149{
150    fn from(value: PbEpochNewChangeLog) -> Self {
151        let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
152        Self {
153            new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
154            old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
155            non_checkpoint_epochs,
156            checkpoint_epoch,
157        }
158    }
159}
160
161impl<T> TableChangeLogCommon<T> {
162    pub fn filter_epoch(
163        &self,
164        (min_epoch, max_epoch): (u64, u64),
165    ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
166        let start = self
167            .0
168            .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < min_epoch);
169        let end = self
170            .0
171            .partition_point(|epoch_change_log| epoch_change_log.first_epoch() <= max_epoch);
172        self.0.range(start..end)
173    }
174
175    /// Get the `next_epoch` of the given `epoch`
176    /// Return:
177    ///     - Ok(Some(`next_epoch`)): the `next_epoch` of `epoch`
178    ///     - Ok(None): `next_epoch` of `epoch` is not added to change log yet
179    ///     - Err(()): `epoch` is not an existing or to exist one
180    #[expect(clippy::result_unit_err)]
181    pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
182        let start = self
183            .0
184            .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < epoch);
185        debug_assert!(
186            self.0
187                .range(start..)
188                .flat_map(|epoch_change_log| epoch_change_log.epochs())
189                .is_sorted()
190        );
191        let mut later_epochs = self
192            .0
193            .range(start..)
194            .flat_map(|epoch_change_log| epoch_change_log.epochs())
195            .skip_while(|log_epoch| *log_epoch < epoch);
196        if let Some(first_epoch) = later_epochs.next() {
197            assert!(
198                first_epoch >= epoch,
199                "first_epoch {} < epoch {}",
200                first_epoch,
201                epoch
202            );
203            if first_epoch != epoch {
204                return Err(());
205            }
206            if let Some(next_epoch) = later_epochs.next() {
207                assert!(
208                    next_epoch > epoch,
209                    "next_epoch {} not exceed epoch {}",
210                    next_epoch,
211                    epoch
212                );
213                Ok(Some(next_epoch))
214            } else {
215                // `epoch` is latest
216                Ok(None)
217            }
218        } else {
219            // all epochs are less than `epoch`
220            Ok(None)
221        }
222    }
223
224    /// Returns epochs where value is non-null and >= `min_epoch`.
225    pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
226        self.filter_epoch((min_epoch, u64::MAX))
227            .filter(|epoch_change_log| {
228                // Filter out empty change logs
229                let new_value_empty = epoch_change_log.new_value.is_empty();
230                let old_value_empty = epoch_change_log.old_value.is_empty();
231                !new_value_empty || !old_value_empty
232            })
233            .flat_map(|epoch_change_log| epoch_change_log.epochs())
234            .filter(|a| a >= &min_epoch)
235            .take(max_count)
236            .collect()
237    }
238
239    pub fn truncate(&mut self, truncate_epoch: u64) {
240        while let Some(change_log) = self.0.front()
241            && change_log.checkpoint_epoch < truncate_epoch
242        {
243            let _change_log = self.0.pop_front().expect("non-empty");
244        }
245        if let Some(first_log) = self.0.front_mut() {
246            first_log
247                .non_checkpoint_epochs
248                .retain(|epoch| *epoch >= truncate_epoch);
249        }
250    }
251}
252
253impl<T> TableChangeLogCommon<T>
254where
255    PbSstableInfo: for<'a> From<&'a T>,
256{
257    pub fn to_protobuf(&self) -> PbTableChangeLog {
258        PbTableChangeLog {
259            change_logs: self.0.iter().map(|a| a.into()).collect(),
260        }
261    }
262}
263
264impl<T> TableChangeLogCommon<T>
265where
266    T: for<'a> From<&'a PbSstableInfo>,
267{
268    pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
269        Self(val.change_logs.iter().map(|a| a.into()).collect())
270    }
271}
272
273pub fn build_table_change_log_delta<'a>(
274    old_value_ssts: impl Iterator<Item = SstableInfo>,
275    new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
276    epochs: &Vec<u64>,
277    log_store_table_ids: impl Iterator<Item = (u32, u64)>,
278) -> HashMap<TableId, ChangeLogDelta> {
279    let mut table_change_log: HashMap<_, _> = log_store_table_ids
280        .map(|(table_id, truncate_epoch)| {
281            let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(epochs);
282            (
283                TableId::new(table_id),
284                ChangeLogDelta {
285                    truncate_epoch,
286                    new_log: EpochNewChangeLog {
287                        new_value: vec![],
288                        old_value: vec![],
289                        non_checkpoint_epochs,
290                        checkpoint_epoch,
291                    },
292                },
293            )
294        })
295        .collect();
296    for sst in old_value_ssts {
297        for table_id in &sst.table_ids {
298            match table_change_log.get_mut(&TableId::new(*table_id)) {
299                Some(log) => {
300                    log.new_log.old_value.push(sst.clone());
301                }
302                None => {
303                    warn!(table_id, ?sst, "old value sst contains non-log-store table");
304                }
305            }
306        }
307    }
308    for sst in new_value_ssts {
309        for table_id in &sst.table_ids {
310            if let Some(log) = table_change_log.get_mut(&TableId::new(*table_id)) {
311                log.new_log.new_value.push(sst.clone());
312            }
313        }
314    }
315    table_change_log
316}
317
318#[derive(Debug, PartialEq, Clone)]
319pub struct ChangeLogDeltaCommon<T> {
320    pub truncate_epoch: u64,
321    pub new_log: EpochNewChangeLogCommon<T>,
322}
323
324pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
325
326impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
327where
328    PbSstableInfo: for<'a> From<&'a T>,
329{
330    fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
331        Self {
332            truncate_epoch: val.truncate_epoch,
333            new_log: Some((&val.new_log).into()),
334        }
335    }
336}
337
338impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
339where
340    T: for<'a> From<&'a PbSstableInfo>,
341{
342    fn from(val: &PbChangeLogDelta) -> Self {
343        Self {
344            truncate_epoch: val.truncate_epoch,
345            new_log: val.new_log.as_ref().unwrap().into(),
346        }
347    }
348}
349
350impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
351where
352    PbSstableInfo: From<T>,
353{
354    fn from(val: ChangeLogDeltaCommon<T>) -> Self {
355        Self {
356            truncate_epoch: val.truncate_epoch,
357            new_log: Some(val.new_log.into()),
358        }
359    }
360}
361
362impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
363where
364    T: From<PbSstableInfo>,
365{
366    fn from(val: PbChangeLogDelta) -> Self {
367        Self {
368            truncate_epoch: val.truncate_epoch,
369            new_log: val.new_log.unwrap().into(),
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use itertools::Itertools;
377
378    use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
379    use crate::sstable_info::SstableInfo;
380
381    #[test]
382    fn test_filter_epoch() {
383        let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
384            EpochNewChangeLog {
385                new_value: vec![],
386                old_value: vec![],
387                non_checkpoint_epochs: vec![],
388                checkpoint_epoch: 2,
389            },
390            EpochNewChangeLog {
391                new_value: vec![],
392                old_value: vec![],
393                non_checkpoint_epochs: vec![3],
394                checkpoint_epoch: 4,
395            },
396            EpochNewChangeLog {
397                new_value: vec![],
398                old_value: vec![],
399                non_checkpoint_epochs: vec![],
400                checkpoint_epoch: 6,
401            },
402            EpochNewChangeLog {
403                new_value: vec![],
404                old_value: vec![],
405                non_checkpoint_epochs: vec![8],
406                checkpoint_epoch: 10,
407            },
408        ]);
409
410        let epochs = (1..=11).collect_vec();
411        for i in 0..epochs.len() {
412            for j in i..epochs.len() {
413                let min_epoch = epochs[i];
414                let max_epoch = epochs[j];
415                let expected = table_change_log
416                    .0
417                    .iter()
418                    .filter(|log| {
419                        min_epoch <= log.checkpoint_epoch && log.first_epoch() <= max_epoch
420                    })
421                    .cloned()
422                    .collect_vec();
423                let actual = table_change_log
424                    .filter_epoch((min_epoch, max_epoch))
425                    .cloned()
426                    .collect_vec();
427                assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
428            }
429        }
430
431        let existing_epochs = table_change_log.epochs().collect_vec();
432        assert!(existing_epochs.is_sorted());
433        for &epoch in &epochs {
434            let expected = match existing_epochs
435                .iter()
436                .position(|existing_epoch| *existing_epoch >= epoch)
437            {
438                None => {
439                    // all existing epochs are less than epoch
440                    Ok(None)
441                }
442                Some(i) => {
443                    let this_epoch = existing_epochs[i];
444                    assert!(this_epoch >= epoch);
445                    if this_epoch == epoch {
446                        if i + 1 == existing_epochs.len() {
447                            // epoch is the latest epoch
448                            Ok(None)
449                        } else {
450                            Ok(Some(existing_epochs[i + 1]))
451                        }
452                    } else {
453                        // epoch not a existing epoch
454                        Err(())
455                    }
456                }
457            };
458            assert_eq!(expected, table_change_log.next_epoch(epoch));
459        }
460    }
461
462    #[test]
463    fn test_truncate() {
464        let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
465            EpochNewChangeLog {
466                new_value: vec![],
467                old_value: vec![],
468                non_checkpoint_epochs: vec![],
469                checkpoint_epoch: 1,
470            },
471            EpochNewChangeLog {
472                new_value: vec![],
473                old_value: vec![],
474                non_checkpoint_epochs: vec![],
475                checkpoint_epoch: 2,
476            },
477            EpochNewChangeLog {
478                new_value: vec![],
479                old_value: vec![],
480                non_checkpoint_epochs: vec![3],
481                checkpoint_epoch: 4,
482            },
483            EpochNewChangeLog {
484                new_value: vec![],
485                old_value: vec![],
486                non_checkpoint_epochs: vec![],
487                checkpoint_epoch: 5,
488            },
489        ]);
490        let origin_table_change_log = table_change_log.clone();
491        for truncate_epoch in 0..6 {
492            table_change_log.truncate(truncate_epoch);
493            let expected_table_change_log = TableChangeLogCommon(
494                origin_table_change_log
495                    .0
496                    .iter()
497                    .filter_map(|epoch_change_log| {
498                        let mut epoch_change_log = epoch_change_log.clone();
499                        epoch_change_log
500                            .non_checkpoint_epochs
501                            .retain(|epoch| *epoch >= truncate_epoch);
502                        if epoch_change_log.non_checkpoint_epochs.is_empty()
503                            && epoch_change_log.checkpoint_epoch < truncate_epoch
504                        {
505                            None
506                        } else {
507                            Some(epoch_change_log)
508                        }
509                    })
510                    .collect(),
511            );
512            assert_eq!(expected_table_change_log, table_change_log);
513        }
514    }
515}