risingwave_hummock_sdk/
change_log.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta;
19use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo, PbTableChangeLog};
20use tracing::warn;
21
22use crate::HummockObjectId;
23use crate::sstable_info::SstableInfo;
24use crate::version::ObjectIdReader;
25
26#[derive(Debug, Clone, PartialEq)]
27pub struct TableChangeLogCommon<T>(
28    // older log at the front
29    VecDeque<EpochNewChangeLogCommon<T>>,
30);
31
32impl<T> TableChangeLogCommon<T> {
33    pub fn new(logs: impl IntoIterator<Item = EpochNewChangeLogCommon<T>>) -> Self {
34        let logs = logs.into_iter().collect::<VecDeque<_>>();
35        debug_assert!(logs.iter().flat_map(|log| log.epochs()).is_sorted());
36        Self(logs)
37    }
38
39    pub fn iter(&self) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> {
40        self.0.iter()
41    }
42
43    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
44        self.0.iter_mut()
45    }
46
47    pub fn add_change_log(&mut self, new_change_log: EpochNewChangeLogCommon<T>) {
48        if let Some(prev_log) = self.0.back() {
49            assert!(prev_log.checkpoint_epoch < new_change_log.first_epoch());
50        }
51        self.0.push_back(new_change_log);
52    }
53
54    pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
55        self.0
56            .iter()
57            .flat_map(|epoch_change_log| epoch_change_log.epochs())
58    }
59
60    pub fn is_empty(&self) -> bool {
61        self.0.is_empty()
62    }
63
64    pub fn binary_search_by_epoch(&self, epoch: u64) -> Result<usize, usize> {
65        self.0
66            .binary_search_by_key(&epoch, |log| log.checkpoint_epoch)
67    }
68}
69
70impl<T> IntoIterator for TableChangeLogCommon<T> {
71    type Item = EpochNewChangeLogCommon<T>;
72
73    type IntoIter = impl Iterator<Item = EpochNewChangeLogCommon<T>>;
74
75    fn into_iter(self) -> Self::IntoIter {
76        self.0.into_iter()
77    }
78}
79
80pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
81pub type TableChangeLogs = HashMap<TableId, TableChangeLog>;
82
83impl TableChangeLog {
84    pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
85        self.0.iter().flat_map(|c| {
86            c.old_value
87                .iter()
88                .chain(c.new_value.iter())
89                .map(|t| HummockObjectId::Sstable(t.object_id()))
90        })
91    }
92}
93
94#[derive(Debug, Clone, PartialEq)]
95pub struct EpochNewChangeLogCommon<T> {
96    pub new_value: Vec<T>,
97    pub old_value: Vec<T>,
98    // epochs are sorted in ascending order
99    pub non_checkpoint_epochs: Vec<u64>,
100    pub checkpoint_epoch: u64,
101}
102
103impl EpochNewChangeLog {
104    pub fn change_log_ssts(&self) -> impl Iterator<Item = &SstableInfo> + '_ {
105        self.new_value.iter().chain(self.old_value.iter())
106    }
107}
108
109pub(crate) fn resolve_pb_log_epochs(epochs: &Vec<u64>) -> (Vec<u64>, u64) {
110    (
111        Vec::from(&epochs[0..(epochs.len() - 1)]),
112        *epochs.last().expect("non-empty"),
113    )
114}
115
116impl<T> EpochNewChangeLogCommon<T> {
117    pub fn epochs(&self) -> impl Iterator<Item = u64> + '_ {
118        self.non_checkpoint_epochs
119            .iter()
120            .cloned()
121            .chain([self.checkpoint_epoch])
122    }
123
124    pub fn first_epoch(&self) -> u64 {
125        self.non_checkpoint_epochs
126            .first()
127            .cloned()
128            .unwrap_or(self.checkpoint_epoch)
129    }
130}
131
132pub type EpochNewChangeLog = EpochNewChangeLogCommon<SstableInfo>;
133
134impl<T> From<&EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
135where
136    PbSstableInfo: for<'a> From<&'a T>,
137{
138    fn from(val: &EpochNewChangeLogCommon<T>) -> Self {
139        Self {
140            new_value: val.new_value.iter().map(|a| a.into()).collect(),
141            old_value: val.old_value.iter().map(|a| a.into()).collect(),
142            epochs: val.epochs().collect(),
143        }
144    }
145}
146
147impl<T> From<&PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
148where
149    T: for<'a> From<&'a PbSstableInfo>,
150{
151    fn from(value: &PbEpochNewChangeLog) -> Self {
152        let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
153        Self {
154            new_value: value.new_value.iter().map(|a| a.into()).collect(),
155            old_value: value.old_value.iter().map(|a| a.into()).collect(),
156            non_checkpoint_epochs,
157            checkpoint_epoch,
158        }
159    }
160}
161
162impl<T> From<EpochNewChangeLogCommon<T>> for PbEpochNewChangeLog
163where
164    PbSstableInfo: From<T>,
165{
166    fn from(val: EpochNewChangeLogCommon<T>) -> Self {
167        Self {
168            epochs: val.epochs().collect(),
169            new_value: val.new_value.into_iter().map(|a| a.into()).collect(),
170            old_value: val.old_value.into_iter().map(|a| a.into()).collect(),
171        }
172    }
173}
174
175impl<T> From<PbEpochNewChangeLog> for EpochNewChangeLogCommon<T>
176where
177    T: From<PbSstableInfo>,
178{
179    fn from(value: PbEpochNewChangeLog) -> Self {
180        let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(&value.epochs);
181        Self {
182            new_value: value.new_value.into_iter().map(|a| a.into()).collect(),
183            old_value: value.old_value.into_iter().map(|a| a.into()).collect(),
184            non_checkpoint_epochs,
185            checkpoint_epoch,
186        }
187    }
188}
189
190impl<T> TableChangeLogCommon<T> {
191    pub fn filter_epoch(
192        &self,
193        (min_epoch, max_epoch): (u64, u64),
194    ) -> impl Iterator<Item = &EpochNewChangeLogCommon<T>> + '_ {
195        let start = self
196            .0
197            .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < min_epoch);
198        let end = self
199            .0
200            .partition_point(|epoch_change_log| epoch_change_log.first_epoch() <= max_epoch);
201        self.0.range(start..end)
202    }
203
204    /// Get the `next_epoch` of the given `epoch`
205    /// Return:
206    ///     - Ok(Some(`next_epoch`)): the `next_epoch` of `epoch`
207    ///     - Ok(None): `next_epoch` of `epoch` is not added to change log yet
208    ///     - Err(()): `epoch` is not an existing or to exist one
209    #[expect(clippy::result_unit_err)]
210    pub fn next_epoch(&self, epoch: u64) -> Result<Option<u64>, ()> {
211        let start = self
212            .0
213            .partition_point(|epoch_change_log| epoch_change_log.checkpoint_epoch < epoch);
214        debug_assert!(
215            self.0
216                .range(start..)
217                .flat_map(|epoch_change_log| epoch_change_log.epochs())
218                .is_sorted()
219        );
220        let mut later_epochs = self
221            .0
222            .range(start..)
223            .flat_map(|epoch_change_log| epoch_change_log.epochs())
224            .skip_while(|log_epoch| *log_epoch < epoch);
225        if let Some(first_epoch) = later_epochs.next() {
226            assert!(
227                first_epoch >= epoch,
228                "first_epoch {} < epoch {}",
229                first_epoch,
230                epoch
231            );
232            if first_epoch != epoch {
233                return Err(());
234            }
235            if let Some(next_epoch) = later_epochs.next() {
236                assert!(
237                    next_epoch > epoch,
238                    "next_epoch {} not exceed epoch {}",
239                    next_epoch,
240                    epoch
241                );
242                Ok(Some(next_epoch))
243            } else {
244                // `epoch` is latest
245                Ok(None)
246            }
247        } else {
248            // all epochs are less than `epoch`
249            Ok(None)
250        }
251    }
252
253    pub fn truncate(&mut self, truncate_epoch: u64) {
254        while let Some(change_log) = self.0.front()
255            && change_log.checkpoint_epoch < truncate_epoch
256        {
257            let _change_log = self.0.pop_front().expect("non-empty");
258        }
259        if let Some(first_log) = self.0.front_mut() {
260            first_log
261                .non_checkpoint_epochs
262                .retain(|epoch| *epoch >= truncate_epoch);
263        }
264    }
265}
266
267impl<T> TableChangeLogCommon<T>
268where
269    PbSstableInfo: for<'a> From<&'a T>,
270{
271    pub fn to_protobuf(&self) -> PbTableChangeLog {
272        PbTableChangeLog {
273            change_logs: self.0.iter().map(|a| a.into()).collect(),
274        }
275    }
276}
277
278impl<T> TableChangeLogCommon<T>
279where
280    T: for<'a> From<&'a PbSstableInfo>,
281{
282    pub fn from_protobuf(val: &PbTableChangeLog) -> Self {
283        Self(val.change_logs.iter().map(|a| a.into()).collect())
284    }
285}
286
287impl<T> TableChangeLogCommon<T>
288where
289    T: From<PbSstableInfo>,
290{
291    pub fn from_protobuf_owned(val: PbTableChangeLog) -> Self {
292        Self(val.change_logs.into_iter().map(|a| a.into()).collect())
293    }
294}
295
296pub fn build_table_change_log_delta<'a>(
297    old_value_ssts: impl Iterator<Item = SstableInfo>,
298    new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
299    epochs: &Vec<u64>,
300    log_store_table_ids: impl Iterator<Item = (TableId, u64)>,
301) -> HashMap<TableId, ChangeLogDelta> {
302    let mut table_change_log: HashMap<_, _> = log_store_table_ids
303        .map(|(table_id, truncate_epoch)| {
304            let (non_checkpoint_epochs, checkpoint_epoch) = resolve_pb_log_epochs(epochs);
305            (
306                table_id,
307                ChangeLogDelta {
308                    truncate_epoch,
309                    new_log: EpochNewChangeLog {
310                        new_value: vec![],
311                        old_value: vec![],
312                        non_checkpoint_epochs,
313                        checkpoint_epoch,
314                    },
315                },
316            )
317        })
318        .collect();
319    for sst in old_value_ssts {
320        for table_id in &sst.table_ids {
321            match table_change_log.get_mut(table_id) {
322                Some(log) => {
323                    log.new_log.old_value.push(sst.clone());
324                }
325                None => {
326                    warn!(%table_id, ?sst, "old value sst contains non-log-store table");
327                }
328            }
329        }
330    }
331    for sst in new_value_ssts {
332        for table_id in &sst.table_ids {
333            if let Some(log) = table_change_log.get_mut(table_id) {
334                log.new_log.new_value.push(sst.clone());
335            }
336        }
337    }
338    table_change_log
339}
340
341#[derive(Debug, PartialEq, Clone)]
342pub struct ChangeLogDeltaCommon<T> {
343    pub truncate_epoch: u64,
344    pub new_log: EpochNewChangeLogCommon<T>,
345}
346
347pub type ChangeLogDelta = ChangeLogDeltaCommon<SstableInfo>;
348
349impl<T> From<&ChangeLogDeltaCommon<T>> for PbChangeLogDelta
350where
351    PbSstableInfo: for<'a> From<&'a T>,
352{
353    fn from(val: &ChangeLogDeltaCommon<T>) -> Self {
354        Self {
355            truncate_epoch: val.truncate_epoch,
356            new_log: Some((&val.new_log).into()),
357        }
358    }
359}
360
361impl<T> From<&PbChangeLogDelta> for ChangeLogDeltaCommon<T>
362where
363    T: for<'a> From<&'a PbSstableInfo>,
364{
365    fn from(val: &PbChangeLogDelta) -> Self {
366        Self {
367            truncate_epoch: val.truncate_epoch,
368            new_log: val.new_log.as_ref().unwrap().into(),
369        }
370    }
371}
372
373impl<T> From<ChangeLogDeltaCommon<T>> for PbChangeLogDelta
374where
375    PbSstableInfo: From<T>,
376{
377    fn from(val: ChangeLogDeltaCommon<T>) -> Self {
378        Self {
379            truncate_epoch: val.truncate_epoch,
380            new_log: Some(val.new_log.into()),
381        }
382    }
383}
384
385impl<T> From<PbChangeLogDelta> for ChangeLogDeltaCommon<T>
386where
387    T: From<PbSstableInfo>,
388{
389    fn from(val: PbChangeLogDelta) -> Self {
390        Self {
391            truncate_epoch: val.truncate_epoch,
392            new_log: val.new_log.unwrap().into(),
393        }
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use itertools::Itertools;
400
401    use crate::change_log::{EpochNewChangeLog, TableChangeLogCommon};
402    use crate::sstable_info::SstableInfo;
403
404    #[test]
405    fn test_filter_epoch() {
406        let table_change_log = TableChangeLogCommon::<SstableInfo>::new([
407            EpochNewChangeLog {
408                new_value: vec![],
409                old_value: vec![],
410                non_checkpoint_epochs: vec![],
411                checkpoint_epoch: 2,
412            },
413            EpochNewChangeLog {
414                new_value: vec![],
415                old_value: vec![],
416                non_checkpoint_epochs: vec![3],
417                checkpoint_epoch: 4,
418            },
419            EpochNewChangeLog {
420                new_value: vec![],
421                old_value: vec![],
422                non_checkpoint_epochs: vec![],
423                checkpoint_epoch: 6,
424            },
425            EpochNewChangeLog {
426                new_value: vec![],
427                old_value: vec![],
428                non_checkpoint_epochs: vec![8],
429                checkpoint_epoch: 10,
430            },
431        ]);
432
433        let epochs = (1..=11).collect_vec();
434        for i in 0..epochs.len() {
435            for j in i..epochs.len() {
436                let min_epoch = epochs[i];
437                let max_epoch = epochs[j];
438                let expected = table_change_log
439                    .0
440                    .iter()
441                    .filter(|log| {
442                        min_epoch <= log.checkpoint_epoch && log.first_epoch() <= max_epoch
443                    })
444                    .cloned()
445                    .collect_vec();
446                let actual = table_change_log
447                    .filter_epoch((min_epoch, max_epoch))
448                    .cloned()
449                    .collect_vec();
450                assert_eq!(expected, actual, "{:?}", (min_epoch, max_epoch));
451            }
452        }
453
454        let existing_epochs = table_change_log.epochs().collect_vec();
455        assert!(existing_epochs.is_sorted());
456        for &epoch in &epochs {
457            let expected = match existing_epochs
458                .iter()
459                .position(|existing_epoch| *existing_epoch >= epoch)
460            {
461                None => {
462                    // all existing epochs are less than epoch
463                    Ok(None)
464                }
465                Some(i) => {
466                    let this_epoch = existing_epochs[i];
467                    assert!(this_epoch >= epoch);
468                    if this_epoch == epoch {
469                        if i + 1 == existing_epochs.len() {
470                            // epoch is the latest epoch
471                            Ok(None)
472                        } else {
473                            Ok(Some(existing_epochs[i + 1]))
474                        }
475                    } else {
476                        // epoch not a existing epoch
477                        Err(())
478                    }
479                }
480            };
481            assert_eq!(expected, table_change_log.next_epoch(epoch));
482        }
483    }
484
485    #[test]
486    fn test_truncate() {
487        let mut table_change_log = TableChangeLogCommon::<SstableInfo>::new([
488            EpochNewChangeLog {
489                new_value: vec![],
490                old_value: vec![],
491                non_checkpoint_epochs: vec![],
492                checkpoint_epoch: 1,
493            },
494            EpochNewChangeLog {
495                new_value: vec![],
496                old_value: vec![],
497                non_checkpoint_epochs: vec![],
498                checkpoint_epoch: 2,
499            },
500            EpochNewChangeLog {
501                new_value: vec![],
502                old_value: vec![],
503                non_checkpoint_epochs: vec![3],
504                checkpoint_epoch: 4,
505            },
506            EpochNewChangeLog {
507                new_value: vec![],
508                old_value: vec![],
509                non_checkpoint_epochs: vec![],
510                checkpoint_epoch: 5,
511            },
512        ]);
513        let origin_table_change_log = table_change_log.clone();
514        for truncate_epoch in 0..6 {
515            table_change_log.truncate(truncate_epoch);
516            let expected_table_change_log = TableChangeLogCommon(
517                origin_table_change_log
518                    .0
519                    .iter()
520                    .filter_map(|epoch_change_log| {
521                        let mut epoch_change_log = epoch_change_log.clone();
522                        epoch_change_log
523                            .non_checkpoint_epochs
524                            .retain(|epoch| *epoch >= truncate_epoch);
525                        if epoch_change_log.non_checkpoint_epochs.is_empty()
526                            && epoch_change_log.checkpoint_epoch < truncate_epoch
527                        {
528                            None
529                        } else {
530                            Some(epoch_change_log)
531                        }
532                    })
533                    .collect(),
534            );
535            assert_eq!(expected_table_change_log, table_change_log);
536        }
537    }
538}