risingwave_hummock_sdk/
change_log.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::sstable_info::SstableInfo;
23
24#[derive(Debug, Clone, PartialEq)]
25pub struct TableChangeLogCommon<T>(
26    // older log at the front
27    VecDeque<EpochNewChangeLogCommon<T>>,
28);
29
30impl<T> TableChangeLogCommon<T> {
31    pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
32        let logs = logs.into_iter().collect::<VecDeque<_>>();
33        debug_assert!(logs.iter().flat_map(|log| log.epochs()).is_sorted());
34        Self(logs)
35    }
36
37    pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
38        self.0.iter()
39    }
40
41    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
42        self.0.iter_mut()
43    }
44
45    pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
46        if let Some(prev_log) = self.0.back() {
47            assert!(prev_log.checkpoint_epoch < new_change_log.first_epoch());
48        }
49        self.0.push_back(new_change_log);
50    }
51
52    pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
53        self.0
54            .iter()
55            .flat_map(|epoch_change_log| epoch_change_log.epochs())
56    }
57
58    pub(crate) fn change_log_into_iter(self) -> impl Iterator<Item = EpochNewChangeLogCommon<T>> {
59        self.0.into_iter()
60    }
61
62    pub(crate) fn change_log_iter_mut(
63        &mut self,
64    ) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
65        self.0.iter_mut()
66    }
67}
68
69pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct EpochNewChangeLogCommon<T> {
73    pub new_value: Vec<T>,
74    pub old_value: Vec<T>,
75    // epochs are sorted in ascending order
76    pub non_checkpoint_epochs: Vec<u64>,
77    pub checkpoint_epoch: u64,
78}
79
80pub(crate) fn resolve_pb_log_epochs(epochs: &Vec<u64>) -> (Vec<u64>, u64) {
81    (
82        Vec::from(&epochs[0..(epochs.len() - 1)]),
83        *epochs.last().expect("non-empty"),
84    )
85}
86
87impl<T> EpochNewChangeLogCommon<T> {
88    pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
89        self.non_checkpoint_epochs
90            .iter()
91            .cloned()
92            .chain([self.checkpoint_epoch])
93    }
94
95    pub fn first_epoch(&self) -> u64 {
96        self.non_checkpoint_epochs
97            .first()
98            .cloned()
99            .unwrap_or(self.checkpoint_epoch)
100    }
101}
102
103pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
104
105impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
106where
107    PbSstableInfo: for<'a> From<&'a T>,
108{
109    fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
110        Self {
111            new_value: val.new_value.iter().map(|a| a.into()).collect(),
112            old_value: val.old_value.iter().map(|a| a.into()).collect(),
113            epochs: val.epochs().collect(),
114        }
115    }
116}
117
118impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
119where
120    T: for<'a> From<&'a PbSstableInfo>,
121{
122    fn from(value: &PbEpochNewChangeLog) -> Self {
123        let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
124        Self {
125            new_value: value.new_value.iter().map(|a| a.into()).collect(),
126            old_value: value.old_value.iter().map(|a| a.into()).collect(),
127            non_checkpoint_epochs,
128            checkpoint_epoch,
129        }
130    }
131}
132
133impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
134where
135    PbSstableInfo: From<T>,
136{
137    fn from(val: EpochNewChangeLogCommon<T>) -> Self {
138        Self {
139            epochs: val.epochs().collect(),
140            new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
141            old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
142        }
143    }
144}
145
146impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
147where
148    T: From<PbSstableInfo>,
149{
150    fn from(value: PbEpochNewChangeLog) -> Self {
151        let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
152        Self {
153            new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
154            old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
155            non_checkpoint_epochs,
156            checkpoint_epoch,
157        }
158    }
159}
160
161impl<T> TableChangeLogCommon<T> {
162    pub fn filter_epoch(
163        &self,
164        (min_epoch, max_epoch): (u64, u64),
165    ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
166        let start = self
167            .0
168            .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < min_epoch);
169        let end = self
170            .0
171            .partition_point(|epoch_change_log| epoch_change_log.first_epoch() <= max_epoch);
172        self.0.range(start..end)
173    }
174
175    /// Get the `next_epoch` of the given `epoch`
176    /// Return:
177    ///     - Ok(Some(`next_epoch`)): the `next_epoch` of `epoch`
178    ///     - Ok(None): `next_epoch` of `epoch` is not added to change log yet
179    ///     - Err(()): `epoch` is not an existing or to exist one
180    #[expect(clippy::result_unit_err)]
181    pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
182        let start = self
183            .0
184            .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < epoch);
185        debug_assert!(
186            self.0
187                .range(start..)
188                .flat_map(|epoch_change_log| epoch_change_log.epochs())
189                .is_sorted()
190        );
191        let mut later_epochs = self
192            .0
193            .range(start..)
194            .flat_map(|epoch_change_log| epoch_change_log.epochs())
195            .skip_while(|log_epoch| *log_epoch < epoch);
196        if let Some(first_epoch) = later_epochs.next() {
197            assert!(
198                first_epoch >= epoch,
199                "first_epoch {} < epoch {}",
200                first_epoch,
201                epoch
202            );
203            if first_epoch != epoch {
204                return Err(());
205            }
206            if let Some(next_epoch) = later_epochs.next() {
207                assert!(
208                    next_epoch > epoch,
209                    "next_epoch {} not exceed epoch {}",
210                    next_epoch,
211                    epoch
212                );
213                Ok(Some(next_epoch))
214            } else {
215                // `epoch` is latest
216                Ok(None)
217            }
218        } else {
219            // all epochs are less than `epoch`
220            Ok(None)
221        }
222    }
223
224    /// Returns epochs where value is non-null and >= `min_epoch`.
225    pub fn get_non_empty_epochs(&self, min_epoch: u64, max_count: usize) -> Vec<u64> {
226        self.filter_epoch((min_epoch, u64::MAX))
227            .filter(|epoch_change_log| {
228                // Filter out empty change logs
229                let new_value_empty = epoch_change_log.new_value.is_empty();
230                let old_value_empty = epoch_change_log.old_value.is_empty();
231                !new_value_empty || !old_value_empty
232            })
233            .flat_map(|epoch_change_log| epoch_change_log.epochs())
234            .filter(|a| a >= &min_epoch)
235            .take(max_count)
236            .collect()
237    }
238
239    pub fn truncate(&mut self, truncate_epoch: u64) {
240        while let Some(change_log) = self.0.front()
241            && change_log.checkpoint_epoch < truncate_epoch
242        {
243            let _change_log = self.0.pop_front().expect("non-empty");
244        }
245        if let Some(first_log) = self.0.front_mut() {
246            first_log
247                .non_checkpoint_epochs
248                .retain(|epoch| *epoch >= truncate_epoch);
249        }
250    }
251}
252
253impl<T> TableChangeLogCommon<T>
254where
255    PbSstableInfo: for<'a> From<&'a T>,
256{
257    pub fn to_protobuf(&self) -> PbTableChangeLog {
258        PbTableChangeLog {
259            change_logs: self.0.iter().map(|a| a.into()).collect(),
260        }
261    }
262}
263
264impl<T> TableChangeLogCommon<T>
265where
266    T: for<'a> From<&'a PbSstableInfo>,
267{
268    pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
269        Self(val.change_logs.iter().map(|a| a.into()).collect())
270    }
271}
272
273impl<T> TableChangeLogCommon<T>
274where
275    T: From<PbSstableInfo>,
276{
277    pub fn from_protobuf_owned(val: PbTableChangeLog) -> Self {
278        Self(val.change_logs.into_iter().map(|a| a.into()).collect())
279    }
280}
281
282pub fn build_table_change_log_delta<'a>(
283    old_value_ssts: impl Iterator<Item = SstableInfo>,
284    new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
285    epochs: &Vec<u64>,
286    log_store_table_ids: impl Iterator<Item = (TableId, u64)>,
287) -> HashMap<TableId, ChangeLogDelta> {
288    let mut table_change_log: HashMap<_, _> = log_store_table_ids
289        .map(|(table_id, truncate_epoch)| {
290            let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(epochs);
291            (
292                table_id,
293                ChangeLogDelta {
294                    truncate_epoch,
295                    new_log: EpochNewChangeLog {
296                        new_value: vec![],
297                        old_value: vec![],
298                        non_checkpoint_epochs,
299                        checkpoint_epoch,
300                    },
301                },
302            )
303        })
304        .collect();
305    for sst in old_value_ssts {
306        for table_id in &sst.table_ids {
307            match table_change_log.get_mut(table_id) {
308                Some(log) => {
309                    log.new_log.old_value.push(sst.clone());
310                }
311                None => {
312                    warn!(%table_id, ?sst, "old value sst contains non-log-store table");
313                }
314            }
315        }
316    }
317    for sst in new_value_ssts {
318        for table_id in &sst.table_ids {
319            if let Some(log) = table_change_log.get_mut(table_id) {
320                log.new_log.new_value.push(sst.clone());
321            }
322        }
323    }
324    table_change_log
325}
326
327#[derive(Debug, PartialEq, Clone)]
328pub struct ChangeLogDeltaCommon<T> {
329    pub truncate_epoch: u64,
330    pub new_log: EpochNewChangeLogCommon<T>,
331}
332
333pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
334
335impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
336where
337    PbSstableInfo: for<'a> From<&'a T>,
338{
339    fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
340        Self {
341            truncate_epoch: val.truncate_epoch,
342            new_log: Some((&val.new_log).into()),
343        }
344    }
345}
346
347impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
348where
349    T: for<'a> From<&'a PbSstableInfo>,
350{
351    fn from(val: &PbChangeLogDelta) -> Self {
352        Self {
353            truncate_epoch: val.truncate_epoch,
354            new_log: val.new_log.as_ref().unwrap().into(),
355        }
356    }
357}
358
359impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
360where
361    PbSstableInfo: From<T>,
362{
363    fn from(val: ChangeLogDeltaCommon<T>) -> Self {
364        Self {
365            truncate_epoch: val.truncate_epoch,
366            new_log: Some(val.new_log.into()),
367        }
368    }
369}
370
371impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
372where
373    T: From<PbSstableInfo>,
374{
375    fn from(val: PbChangeLogDelta) -> Self {
376        Self {
377            truncate_epoch: val.truncate_epoch,
378            new_log: val.new_log.unwrap().into(),
379        }
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use itertools::Itertools;
386
387    use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
388    use crate::sstable_info::SstableInfo;
389
390    #[test]
391    fn test_filter_epoch() {
392        let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
393            EpochNewChangeLog {
394                new_value: vec![],
395                old_value: vec![],
396                non_checkpoint_epochs: vec![],
397                checkpoint_epoch: 2,
398            },
399            EpochNewChangeLog {
400                new_value: vec![],
401                old_value: vec![],
402                non_checkpoint_epochs: vec![3],
403                checkpoint_epoch: 4,
404            },
405            EpochNewChangeLog {
406                new_value: vec![],
407                old_value: vec![],
408                non_checkpoint_epochs: vec![],
409                checkpoint_epoch: 6,
410            },
411            EpochNewChangeLog {
412                new_value: vec![],
413                old_value: vec![],
414                non_checkpoint_epochs: vec![8],
415                checkpoint_epoch: 10,
416            },
417        ]);
418
419        let epochs = (1..=11).collect_vec();
420        for i in 0..epochs.len() {
421            for j in i..epochs.len() {
422                let min_epoch = epochs[i];
423                let max_epoch = epochs[j];
424                let expected = table_change_log
425                    .0
426                    .iter()
427                    .filter(|log| {
428                        min_epoch <= log.checkpoint_epoch && log.first_epoch() <= max_epoch
429                    })
430                    .cloned()
431                    .collect_vec();
432                let actual = table_change_log
433                    .filter_epoch((min_epoch, max_epoch))
434                    .cloned()
435                    .collect_vec();
436                assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
437            }
438        }
439
440        let existing_epochs = table_change_log.epochs().collect_vec();
441        assert!(existing_epochs.is_sorted());
442        for &epoch in &epochs {
443            let expected = match existing_epochs
444                .iter()
445                .position(|existing_epoch| *existing_epoch >= epoch)
446            {
447                None => {
448                    // all existing epochs are less than epoch
449                    Ok(None)
450                }
451                Some(i) => {
452                    let this_epoch = existing_epochs[i];
453                    assert!(this_epoch >= epoch);
454                    if this_epoch == epoch {
455                        if i + 1 == existing_epochs.len() {
456                            // epoch is the latest epoch
457                            Ok(None)
458                        } else {
459                            Ok(Some(existing_epochs[i + 1]))
460                        }
461                    } else {
462                        // epoch not a existing epoch
463                        Err(())
464                    }
465                }
466            };
467            assert_eq!(expected, table_change_log.next_epoch(epoch));
468        }
469    }
470
471    #[test]
472    fn test_truncate() {
473        let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
474            EpochNewChangeLog {
475                new_value: vec![],
476                old_value: vec![],
477                non_checkpoint_epochs: vec![],
478                checkpoint_epoch: 1,
479            },
480            EpochNewChangeLog {
481                new_value: vec![],
482                old_value: vec![],
483                non_checkpoint_epochs: vec![],
484                checkpoint_epoch: 2,
485            },
486            EpochNewChangeLog {
487                new_value: vec![],
488                old_value: vec![],
489                non_checkpoint_epochs: vec![3],
490                checkpoint_epoch: 4,
491            },
492            EpochNewChangeLog {
493                new_value: vec![],
494                old_value: vec![],
495                non_checkpoint_epochs: vec![],
496                checkpoint_epoch: 5,
497            },
498        ]);
499        let origin_table_change_log = table_change_log.clone();
500        for truncate_epoch in 0..6 {
501            table_change_log.truncate(truncate_epoch);
502            let expected_table_change_log = TableChangeLogCommon(
503                origin_table_change_log
504                    .0
505                    .iter()
506                    .filter_map(|epoch_change_log| {
507                        let mut epoch_change_log = epoch_change_log.clone();
508                        epoch_change_log
509                            .non_checkpoint_epochs
510                            .retain(|epoch| *epoch >= truncate_epoch);
511                        if epoch_change_log.non_checkpoint_epochs.is_empty()
512                            && epoch_change_log.checkpoint_epoch < truncate_epoch
513                        {
514                            None
515                        } else {
516                            Some(epoch_change_log)
517                        }
518                    })
519                    .collect(),
520            );
521            assert_eq!(expected_table_change_log, table_change_log);
522        }
523    }
524}