risingwave_storage/hummock/iterator/
change_log.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::ops::Bound::{Excluded, Included, Unbounded};
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::must_match;
20use risingwave_common::util::epoch::MAX_SPILL_TIMES;
21use risingwave_hummock_sdk::EpochWithGap;
22use risingwave_hummock_sdk::key::{
23    FullKey, SetSlice, TableKeyRange, UserKey, UserKeyRange, bound_table_key_range,
24};
25
26use crate::StateStoreIter;
27use crate::error::StorageResult;
28use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator};
29use crate::hummock::value::HummockValue;
30use crate::hummock::{HummockResult, SstableIterator};
31use crate::monitor::IterLocalMetricsGuard;
32use crate::store::{ChangeLogValue, StateStoreReadLogItem, StateStoreReadLogItemRef};
33
34struct ChangeLogIteratorInner<
35    NI: HummockIterator<Direction = Forward>,
36    OI: HummockIterator<Direction = Forward>,
37> {
38    /// Iterator for new value. In each `next`, the iterator will iterate over all value of the current key.
39    /// Therefore, we need to buffer the key and newest value in `curr_key` and `new_value`.
40    ///
41    /// We assume that all operation between `min_epoch` and `max_epoch` will be included in the `new_value_iter`.
42    new_value_iter: NI,
43    /// Iterator for old value. When `is_old_value_set` is true, its value is the old value in the change log value.
44    ///
45    /// We assume that each old value will have a new value of the same epoch in the `new_value_iter`. This is to say,
46    /// For a specific key, we won't have an epoch that only exists in the `old_value_iter` but not exists in `new_value_iter`.
47    /// `Delete` also contains a tombstone value.
48    old_value_iter: OI,
49    /// Inclusive max epoch
50    max_epoch: u64,
51    /// Inclusive min epoch
52    min_epoch: u64,
53    key_range: UserKeyRange,
54
55    /// Buffer of current key
56    curr_key: FullKey<Vec<u8>>,
57    /// Buffer for new value. Only valid when `is_new_value_delete` is true
58    new_value: Vec<u8>,
59    /// Indicate whether the current new value is delete.
60    is_new_value_delete: bool,
61
62    /// Whether Indicate whether the current `old_value_iter` represents the old value in `ChangeLogValue`
63    is_old_value_set: bool,
64
65    /// Whether the iterator is currently pointing at a valid key with `ChangeLogValue`
66    is_current_pos_valid: bool,
67}
68
69impl<NI: HummockIterator<Direction = Forward>, OI: HummockIterator<Direction = Forward>>
70    ChangeLogIteratorInner<NI, OI>
71{
72    fn new(
73        (min_epoch, max_epoch): (u64, u64),
74        key_range: UserKeyRange,
75        new_value_iter: NI,
76        old_value_iter: OI,
77    ) -> Self {
78        Self {
79            new_value_iter,
80            old_value_iter,
81            min_epoch,
82            max_epoch,
83            key_range,
84
85            curr_key: FullKey::default(),
86            new_value: vec![],
87            is_new_value_delete: false,
88            is_old_value_set: false,
89            is_current_pos_valid: false,
90        }
91    }
92
93    /// Resets the iterating position to the beginning.
94    pub async fn rewind(&mut self) -> HummockResult<()> {
95        // Handle range scan
96        match &self.key_range.0 {
97            Included(begin_key) => {
98                let full_key = FullKey {
99                    user_key: begin_key.as_ref(),
100                    epoch_with_gap: EpochWithGap::new(self.max_epoch, MAX_SPILL_TIMES),
101                };
102                self.new_value_iter.seek(full_key).await?;
103                self.old_value_iter.seek(full_key).await?;
104            }
105            Excluded(_) => unimplemented!("excluded begin key is not supported"),
106            Unbounded => {
107                self.new_value_iter.rewind().await?;
108                self.old_value_iter.rewind().await?;
109            }
110        };
111
112        self.try_advance_to_next_change_log_value().await?;
113        Ok(())
114    }
115
116    pub async fn next(&mut self) -> HummockResult<()> {
117        self.try_advance_to_next_change_log_value().await
118    }
119
120    pub fn is_valid(&self) -> bool {
121        self.is_current_pos_valid
122    }
123
124    pub fn log_value(&self) -> ChangeLogValue<&[u8]> {
125        if self.is_new_value_delete {
126            ChangeLogValue::Delete(
127                self.old_value()
128                    .expect("should have old value when new value is delete"),
129            )
130        } else {
131            match self.old_value() {
132                Some(old_value) => ChangeLogValue::Update {
133                    new_value: self.new_value.as_slice(),
134                    old_value,
135                },
136                None => ChangeLogValue::Insert(self.new_value.as_slice()),
137            }
138        }
139    }
140
141    pub fn key(&self) -> UserKey<&[u8]> {
142        self.curr_key.user_key.as_ref()
143    }
144}
145
146impl<NI: HummockIterator<Direction = Forward>, OI: HummockIterator<Direction = Forward>>
147    ChangeLogIteratorInner<NI, OI>
148{
149    async fn try_advance_to_next_change_log_value(&mut self) -> HummockResult<()> {
150        loop {
151            self.try_advance_to_next_valid().await?;
152            if !self.is_valid() {
153                break;
154            }
155            if self.has_log_value() {
156                break;
157            } else {
158                continue;
159            }
160        }
161        Ok(())
162    }
163
164    fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
165        // handle range scan
166        match &self.key_range.1 {
167            Included(end_key) => user_key > end_key.as_ref(),
168            Excluded(end_key) => user_key >= end_key.as_ref(),
169            Unbounded => false,
170        }
171    }
172
173    /// Advance the `new_value_iter` to a valid key and valid epoch.
174    async fn advance_to_valid_key(&mut self) -> HummockResult<()> {
175        self.is_current_pos_valid = false;
176        loop {
177            if !self.new_value_iter.is_valid() {
178                return Ok(());
179            }
180
181            let key = self.new_value_iter.key();
182
183            // Handle epoch visibility
184            if !self.is_valid_epoch(key.epoch_with_gap) {
185                self.new_value_iter.next().await?;
186                continue;
187            }
188
189            if self.user_key_out_of_range(key.user_key) {
190                return Ok(());
191            }
192
193            break;
194        }
195
196        debug_assert!(self.new_value_iter.is_valid());
197        debug_assert!(self.is_valid_epoch(self.new_value_iter.key().epoch_with_gap));
198        debug_assert!(!self.user_key_out_of_range(self.new_value_iter.key().user_key));
199        self.is_current_pos_valid = true;
200        // The key and value will be saved in a buffer, because in the next step we will
201        // continue advancing the `new_value_iter`.
202        self.curr_key.set(self.new_value_iter.key());
203        match self.new_value_iter.value() {
204            HummockValue::Put(val) => {
205                self.new_value.set(val);
206                self.is_new_value_delete = false;
207            }
208            HummockValue::Delete => {
209                self.new_value.clear();
210                self.is_new_value_delete = true;
211            }
212        }
213
214        Ok(())
215    }
216
217    /// Advance the `new_value_iter` to find the oldest epoch of the current key.
218    async fn advance_to_find_oldest_epoch(&mut self) -> HummockResult<EpochWithGap> {
219        let mut ret = self.curr_key.epoch_with_gap;
220        debug_assert!(self.is_valid_epoch(ret));
221        self.new_value_iter.next().await?;
222        loop {
223            if !self.new_value_iter.is_valid() {
224                break;
225            }
226            let key = self.new_value_iter.key();
227            match self.curr_key.user_key.as_ref().cmp(&key.user_key) {
228                Ordering::Less => {
229                    // has advance to next key
230                    break;
231                }
232                Ordering::Equal => {
233                    assert!(ret > key.epoch_with_gap);
234                    if !self.is_valid_epoch(key.epoch_with_gap) {
235                        debug_assert!(self.min_epoch > key.epoch_with_gap.pure_epoch());
236                        break;
237                    }
238                    ret = key.epoch_with_gap;
239                    self.new_value_iter.next().await?;
240                    continue;
241                }
242                Ordering::Greater => {
243                    unreachable!(
244                        "hummock iterator advance to a prev key: {:?} {:?}",
245                        self.curr_key,
246                        self.new_value_iter.key()
247                    );
248                }
249            }
250        }
251        debug_assert!(self.is_valid_epoch(ret));
252
253        Ok(ret)
254    }
255
256    /// Advance the two iters to a valid position. After it returns with Ok,
257    /// it is possible that the position is valid but there is no change log value,
258    /// because the new and old value may consume each other, such as Insert in old epoch,
259    /// but then Delete in new epoch
260    async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
261        // 1. advance the new_value_iter to the newest op between max and min epoch
262        self.advance_to_valid_key().await?;
263
264        if !self.is_current_pos_valid {
265            return Ok(());
266        }
267
268        // 2. advance new_value_iter to out of the valid range, and save the oldest value
269        let oldest_epoch = self.advance_to_find_oldest_epoch().await?;
270
271        // 3. iterate old value iter to the oldest epoch
272        self.is_old_value_set = false;
273        loop {
274            if !self.old_value_iter.is_valid() {
275                break;
276            }
277
278            let old_value_iter_key = self.old_value_iter.key();
279            match self
280                .curr_key
281                .user_key
282                .as_ref()
283                .cmp(&old_value_iter_key.user_key.as_ref())
284            {
285                Ordering::Less => {
286                    // old value iter has advanced over the current range
287                    break;
288                }
289                Ordering::Equal => match old_value_iter_key.epoch_with_gap.cmp(&oldest_epoch) {
290                    Ordering::Less => {
291                        // The assertion holds because we assume that for a specific key, any old value will have a new value of the same
292                        // epoch in the `new_value_iter`. If the assertion is broken, it means we must have a new value of the same epoch
293                        // that are valid but older than the `oldest_epoch`, which breaks the definition of `oldest_epoch`.
294                        assert!(
295                            old_value_iter_key.epoch_with_gap.pure_epoch() < self.min_epoch,
296                            "there should not be old value between oldest new_value and min_epoch. \
297                                new value key: {:?}, oldest epoch: {:?}, min epoch: {:?}, old value epoch: {:?}",
298                            self.curr_key,
299                            oldest_epoch,
300                            self.min_epoch,
301                            old_value_iter_key.epoch_with_gap
302                        );
303                        break;
304                    }
305                    Ordering::Equal => {
306                        self.is_old_value_set = true;
307                        break;
308                    }
309                    Ordering::Greater => {
310                        self.old_value_iter.next().await?;
311                        continue;
312                    }
313                },
314                Ordering::Greater => {
315                    self.old_value_iter.next().await?;
316                    continue;
317                }
318            }
319        }
320
321        Ok(())
322    }
323
324    fn is_valid_epoch(&self, epoch: EpochWithGap) -> bool {
325        let epoch = epoch.pure_epoch();
326        self.min_epoch <= epoch && epoch <= self.max_epoch
327    }
328
329    fn old_value(&self) -> Option<&[u8]> {
330        if self.is_old_value_set {
331            debug_assert!(self.old_value_iter.is_valid());
332            debug_assert_eq!(
333                self.old_value_iter.key().user_key,
334                self.curr_key.user_key.as_ref()
335            );
336            Some(must_match!(self.old_value_iter.value(), HummockValue::Put(val) => val))
337        } else {
338            None
339        }
340    }
341
342    fn has_log_value(&self) -> bool {
343        debug_assert!(self.is_current_pos_valid);
344        !self.is_new_value_delete || self.is_old_value_set
345    }
346}
347
348impl Drop for ChangeLogIterator {
349    fn drop(&mut self) {
350        self.inner
351            .new_value_iter
352            .collect_local_statistic(&mut self.stats_guard.local_stats);
353        self.inner
354            .old_value_iter
355            .collect_local_statistic(&mut self.stats_guard.local_stats);
356    }
357}
358
359pub struct ChangeLogIterator {
360    inner: ChangeLogIteratorInner<MergeIterator<SstableIterator>, MergeIterator<SstableIterator>>,
361    initial_read: bool,
362    stats_guard: IterLocalMetricsGuard,
363}
364
365impl ChangeLogIterator {
366    pub async fn new(
367        epoch_range: (u64, u64),
368        table_key_range: TableKeyRange,
369        new_value_iter: MergeIterator<SstableIterator>,
370        old_value_iter: MergeIterator<SstableIterator>,
371        table_id: TableId,
372        stats_guard: IterLocalMetricsGuard,
373    ) -> HummockResult<Self> {
374        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
375        let (start_bound, end_bound) = (
376            user_key_range_ref.0.map(|key| key.cloned()),
377            user_key_range_ref.1.map(|key| key.cloned()),
378        );
379        let mut inner = ChangeLogIteratorInner::new(
380            epoch_range,
381            (start_bound, end_bound),
382            new_value_iter,
383            old_value_iter,
384        );
385        inner.rewind().await?;
386        Ok(Self {
387            inner,
388            initial_read: false,
389            stats_guard,
390        })
391    }
392}
393
394impl StateStoreIter<StateStoreReadLogItem> for ChangeLogIterator {
395    async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
396        if !self.initial_read {
397            self.initial_read = true;
398        } else {
399            self.inner.next().await?;
400        }
401        if self.inner.is_valid() {
402            Ok(Some((self.inner.key().table_key, self.inner.log_value())))
403        } else {
404            Ok(None)
405        }
406    }
407}
408
409#[cfg(any(test, feature = "test"))]
410pub mod test_utils {
411    use std::collections::HashMap;
412
413    use bytes::Bytes;
414    use rand::{Rng, RngCore, rng as thread_rng};
415    use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, test_epoch};
416    use risingwave_hummock_sdk::key::TableKey;
417
418    use crate::hummock::iterator::test_utils::iterator_test_table_key_of;
419    use crate::mem_table::KeyOp;
420    use crate::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions};
421
422    pub type TestLogDataType = Vec<(u64, Vec<(TableKey<Bytes>, KeyOp)>)>;
423
424    pub fn gen_test_data(
425        epoch_count: usize,
426        key_count: usize,
427        skip_ratio: f64,
428        delete_ratio: f64,
429    ) -> TestLogDataType {
430        let mut store: HashMap<TableKey<Bytes>, Bytes> = HashMap::new();
431        let mut rng = thread_rng();
432        let mut logs = Vec::new();
433        for epoch_idx in 1..=(epoch_count - 1) {
434            let mut epoch_logs = Vec::new();
435            let epoch = test_epoch(epoch_idx as _);
436            for key_idx in 0..key_count {
437                if rng.random_bool(skip_ratio) {
438                    continue;
439                }
440                let key = TableKey(Bytes::from(iterator_test_table_key_of(key_idx)));
441                if rng.random_bool(delete_ratio) {
442                    if let Some(prev_value) = store.remove(&key) {
443                        epoch_logs.push((key, KeyOp::Delete(prev_value)));
444                    }
445                } else {
446                    let value = Bytes::copy_from_slice(rng.next_u64().to_string().as_bytes());
447                    let prev_value = store.get(&key);
448                    if let Some(prev_value) = prev_value {
449                        epoch_logs.push((
450                            key.clone(),
451                            KeyOp::Update((prev_value.clone(), value.clone())),
452                        ));
453                    } else {
454                        epoch_logs.push((key.clone(), KeyOp::Insert(value.clone())));
455                    }
456                    store.insert(key, value);
457                }
458            }
459            logs.push((epoch, epoch_logs));
460        }
461        // at the end add an epoch with only delete
462        {
463            let mut epoch_logs = Vec::new();
464            let epoch = test_epoch(epoch_count as _);
465            for (key, value) in store {
466                epoch_logs.push((key, KeyOp::Delete(value)));
467            }
468            logs.push((epoch, epoch_logs));
469        }
470        logs
471    }
472
473    pub async fn apply_test_log_data(
474        log_data: TestLogDataType,
475        state_store: &mut impl LocalStateStore,
476        try_flush_ratio: f64,
477    ) {
478        let mut rng = thread_rng();
479        let first_epoch = log_data[0].0;
480        for (epoch, epoch_logs) in log_data {
481            if epoch == first_epoch {
482                state_store
483                    .init(InitOptions {
484                        epoch: EpochPair::new_test_epoch(epoch),
485                    })
486                    .await
487                    .unwrap();
488            } else {
489                state_store.flush().await.unwrap();
490                state_store.seal_current_epoch(
491                    epoch,
492                    SealCurrentEpochOptions {
493                        table_watermarks: None,
494                        switch_op_consistency_level: None,
495                    },
496                );
497            }
498            for (key, op) in epoch_logs {
499                match op {
500                    KeyOp::Insert(value) => {
501                        state_store.insert(key, value, None).unwrap();
502                    }
503                    KeyOp::Delete(old_value) => {
504                        state_store.delete(key, old_value).unwrap();
505                    }
506                    KeyOp::Update((old_value, value)) => {
507                        state_store.insert(key, value, Some(old_value)).unwrap();
508                    }
509                }
510                if rng.random_bool(try_flush_ratio) {
511                    state_store.try_flush().await.unwrap();
512                }
513            }
514        }
515        state_store.flush().await.unwrap();
516        state_store.seal_current_epoch(
517            MAX_EPOCH,
518            SealCurrentEpochOptions {
519                table_watermarks: None,
520                switch_op_consistency_level: None,
521            },
522        );
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use std::collections::BTreeMap;
529    use std::ops::Bound::Unbounded;
530
531    use bytes::Bytes;
532    use itertools::Itertools;
533    use risingwave_common::bitmap::Bitmap;
534    use risingwave_common::catalog::TableId;
535    use risingwave_common::hash::VirtualNode;
536    use risingwave_common::util::epoch::test_epoch;
537    use risingwave_hummock_sdk::EpochWithGap;
538    use risingwave_hummock_sdk::key::{TableKey, UserKey};
539
540    use crate::hummock::iterator::MergeIterator;
541    use crate::hummock::iterator::change_log::ChangeLogIteratorInner;
542    use crate::hummock::iterator::change_log::test_utils::{
543        TestLogDataType, apply_test_log_data, gen_test_data,
544    };
545    use crate::hummock::iterator::test_utils::{
546        iterator_test_table_key_of, iterator_test_value_of,
547    };
548    use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore};
549    use crate::memory::MemoryStateStore;
550    use crate::store::{
551        CHECK_BYTES_EQUAL, ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions,
552        StateStoreReadLog,
553    };
554    use crate::{StateStore, StateStoreIter};
555
556    #[tokio::test]
557    async fn test_empty() {
558        let table_id = TableId::new(233);
559        let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
560        let empty = BTreeMap::new();
561        let new_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
562        let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
563        let mut iter = ChangeLogIteratorInner::new(
564            (epoch.pure_epoch(), epoch.pure_epoch()),
565            (Unbounded, Unbounded),
566            new_value_iter,
567            old_value_iter,
568        );
569        iter.rewind().await.unwrap();
570        assert!(!iter.is_valid());
571    }
572
573    #[tokio::test]
574    async fn test_append_only() {
575        let table_id = TableId::new(233);
576
577        let count = 100;
578        let kvs = (0..count)
579            .map(|i| {
580                (
581                    TableKey(Bytes::from(iterator_test_table_key_of(i))),
582                    Bytes::from(iterator_test_value_of(i)),
583                )
584            })
585            .collect_vec();
586        let mem_tables = kvs
587            .iter()
588            .map(|(key, value)| {
589                let mut t = MemTable::new(OpConsistencyLevel::Inconsistent);
590                t.insert(key.clone(), value.clone()).unwrap();
591                t
592            })
593            .collect_vec();
594        let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
595        let new_value_iter = MergeIterator::new(
596            mem_tables
597                .iter()
598                .map(|mem_table| MemTableHummockIterator::new(&mem_table.buffer, epoch, table_id)),
599        );
600        let empty = BTreeMap::new();
601        let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
602        let mut iter = ChangeLogIteratorInner::new(
603            (epoch.pure_epoch(), epoch.pure_epoch()),
604            (Unbounded, Unbounded),
605            new_value_iter,
606            old_value_iter,
607        );
608        iter.rewind().await.unwrap();
609        for (key, value) in kvs {
610            assert!(iter.is_valid());
611            assert_eq!(
612                UserKey {
613                    table_id,
614                    table_key: key.to_ref(),
615                },
616                iter.key()
617            );
618            assert_eq!(ChangeLogValue::Insert(value.as_ref()), iter.log_value());
619            iter.next().await.unwrap();
620        }
621        assert!(!iter.is_valid());
622    }
623
624    #[tokio::test]
625    async fn test_delete_only() {
626        let table_id = TableId::new(233);
627
628        let count = 100;
629        let kvs = (0..count)
630            .map(|i| {
631                (
632                    TableKey(Bytes::from(iterator_test_table_key_of(i))),
633                    Bytes::from(iterator_test_value_of(i)),
634                )
635            })
636            .collect_vec();
637        let mut new_value_memtable = MemTable::new(OpConsistencyLevel::Inconsistent);
638        let mut old_value_memtable = MemTable::new(OpConsistencyLevel::Inconsistent);
639        for (key, value) in &kvs {
640            new_value_memtable
641                .delete(key.clone(), Bytes::new())
642                .unwrap();
643            old_value_memtable
644                .insert(key.clone(), value.clone())
645                .unwrap();
646        }
647        let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
648        let new_value_iter =
649            MemTableHummockIterator::new(&new_value_memtable.buffer, epoch, table_id);
650        let old_value_iter =
651            MemTableHummockIterator::new(&old_value_memtable.buffer, epoch, table_id);
652        let mut iter = ChangeLogIteratorInner::new(
653            (epoch.pure_epoch(), epoch.pure_epoch()),
654            (Unbounded, Unbounded),
655            new_value_iter,
656            old_value_iter,
657        );
658        iter.rewind().await.unwrap();
659        for (key, value) in kvs {
660            assert!(iter.is_valid());
661            assert_eq!(
662                UserKey {
663                    table_id,
664                    table_key: key.to_ref(),
665                },
666                iter.key()
667            );
668            assert_eq!(ChangeLogValue::Delete(value.as_ref()), iter.log_value());
669            iter.next().await.unwrap();
670        }
671        assert!(!iter.is_valid());
672    }
673
674    fn gen_test_mem_table_store(
675        test_log_data: TestLogDataType,
676    ) -> Vec<(u64, MemTableStore, MemTableStore)> {
677        let mut logs = Vec::new();
678        for (epoch, epoch_logs) in test_log_data {
679            let mut new_values = MemTableStore::new();
680            let mut old_values = MemTableStore::new();
681            for (key, op) in epoch_logs {
682                new_values.insert(key.clone(), op.clone());
683                if let KeyOp::Delete(old_value) | KeyOp::Update((old_value, _)) = op {
684                    old_values.insert(key, KeyOp::Insert(old_value));
685                }
686            }
687            logs.push((epoch, new_values, old_values));
688        }
689        logs
690    }
691
692    #[tokio::test]
693    async fn test_random_data() {
694        let table_id = TableId::new(233);
695        let epoch_count = 10;
696        let state_store = MemoryStateStore::new();
697        let mut local = state_store
698            .new_local(NewLocalOptions {
699                table_id,
700                op_consistency_level: OpConsistencyLevel::ConsistentOldValue {
701                    check_old_value: CHECK_BYTES_EQUAL.clone(),
702                    is_log_store: true,
703                },
704                table_option: Default::default(),
705                is_replicated: false,
706                vnodes: Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into(),
707            })
708            .await;
709        let logs = gen_test_data(epoch_count, 10000, 0.05, 0.2);
710        assert_eq!(logs.len(), epoch_count);
711        apply_test_log_data(logs.clone(), &mut local, 0.0).await;
712        let mem_table_logs = gen_test_mem_table_store(logs.clone());
713        assert_eq!(mem_table_logs.len(), epoch_count);
714        for start_epoch_idx in 0..epoch_count {
715            for end_epoch_idx in start_epoch_idx..epoch_count {
716                let new_value_iter = MergeIterator::new(mem_table_logs.iter().map(
717                    |(epoch, new_value_memtable, _)| {
718                        MemTableHummockIterator::new(
719                            new_value_memtable,
720                            EpochWithGap::new_from_epoch(*epoch),
721                            table_id,
722                        )
723                    },
724                ));
725                let old_value_iter = MergeIterator::new(mem_table_logs.iter().map(
726                    |(epoch, _, old_value_memtable)| {
727                        MemTableHummockIterator::new(
728                            old_value_memtable,
729                            EpochWithGap::new_from_epoch(*epoch),
730                            table_id,
731                        )
732                    },
733                ));
734                let epoch_range = (logs[start_epoch_idx].0, logs[end_epoch_idx].0);
735                let mut change_log_iter = ChangeLogIteratorInner::new(
736                    epoch_range,
737                    (Unbounded, Unbounded),
738                    new_value_iter,
739                    old_value_iter,
740                );
741                change_log_iter.rewind().await.unwrap();
742                let mut expected_change_log_iter = state_store
743                    .iter_log(
744                        epoch_range,
745                        (Unbounded, Unbounded),
746                        ReadLogOptions { table_id },
747                    )
748                    .await
749                    .unwrap();
750                while let Some((key, change_log_value)) =
751                    expected_change_log_iter.try_next().await.unwrap()
752                {
753                    assert!(change_log_iter.is_valid());
754                    assert_eq!(
755                        change_log_iter.key(),
756                        UserKey {
757                            table_id,
758                            table_key: key,
759                        },
760                    );
761                    assert_eq!(change_log_iter.log_value(), change_log_value);
762                    change_log_iter.next().await.unwrap();
763                }
764                assert!(!change_log_iter.is_valid());
765            }
766        }
767    }
768}