risingwave_storage/hummock/iterator/
forward_merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod test {
17    use std::future::{pending, poll_fn};
18    use std::iter::once;
19    use std::sync::Arc;
20    use std::task::Poll;
21
22    use futures::{FutureExt, pin_mut};
23    use risingwave_hummock_sdk::EpochWithGap;
24    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
25
26    use crate::hummock::HummockResult;
27    use crate::hummock::iterator::test_utils::{
28        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_info,
29        gen_merge_iterator_interleave_test_sstable_iters, iterator_test_key_of,
30        iterator_test_value_of, mock_sstable_store,
31    };
32    use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, ValueMeta};
33    use crate::hummock::sstable::{
34        SstableIterator, SstableIteratorReadOptions, SstableIteratorType,
35    };
36    use crate::hummock::value::HummockValue;
37    use crate::monitor::StoreLocalStatistic;
38
39    #[tokio::test]
40    async fn test_merge_basic() {
41        let mut iter = MergeIterator::new(
42            gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await,
43        );
44
45        // Test merge iterators
46        let mut i = 0;
47        iter.rewind().await.unwrap();
48        while iter.is_valid() {
49            let key = iter.key();
50            let val = iter.value();
51            assert_eq!(key, iterator_test_key_of(i).to_ref());
52            assert_eq!(
53                val.into_user_value().unwrap(),
54                iterator_test_value_of(i).as_slice()
55            );
56            i += 1;
57            iter.next().await.unwrap();
58            if i == TEST_KEYS_COUNT * 3 {
59                assert!(!iter.is_valid());
60                break;
61            }
62        }
63        assert!(i >= TEST_KEYS_COUNT * 3);
64    }
65
66    #[tokio::test]
67    async fn test_merge_seek() {
68        let mut iter = MergeIterator::new(
69            gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await,
70        );
71
72        // Test merge iterators
73        // right edge case
74        iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 3).to_ref())
75            .await
76            .unwrap();
77        assert!(!iter.is_valid());
78
79        // normal case
80        iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref())
81            .await
82            .unwrap();
83        let k = iter.key();
84        let v = iter.value();
85        assert_eq!(
86            v.into_user_value().unwrap(),
87            iterator_test_value_of(TEST_KEYS_COUNT * 2 + 5).as_slice()
88        );
89        assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref());
90
91        iter.seek(iterator_test_key_of(17).to_ref()).await.unwrap();
92        let k = iter.key();
93        let v = iter.value();
94        assert_eq!(
95            v.into_user_value().unwrap(),
96            iterator_test_value_of(TEST_KEYS_COUNT + 7).as_slice()
97        );
98        assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT + 7).to_ref());
99
100        // left edge case
101        iter.seek(iterator_test_key_of(0).to_ref()).await.unwrap();
102        let k = iter.key();
103        let v = iter.value();
104        assert_eq!(
105            v.into_user_value().unwrap(),
106            iterator_test_value_of(0).as_slice()
107        );
108        assert_eq!(k, iterator_test_key_of(0).to_ref());
109    }
110
111    #[tokio::test]
112    async fn test_merge_invalidate_reset() {
113        let sstable_store = mock_sstable_store().await;
114        let read_options = Arc::new(SstableIteratorReadOptions::default());
115        let table0 = gen_iterator_test_sstable_info(
116            0,
117            default_builder_opt_for_test(),
118            |x| x,
119            sstable_store.clone(),
120            TEST_KEYS_COUNT,
121        )
122        .await;
123        let table1 = gen_iterator_test_sstable_info(
124            1,
125            default_builder_opt_for_test(),
126            |x| TEST_KEYS_COUNT + x,
127            sstable_store.clone(),
128            TEST_KEYS_COUNT,
129        )
130        .await;
131
132        let mut stats = StoreLocalStatistic::default();
133        let mut iter = MergeIterator::new(vec![
134            SstableIterator::create(
135                sstable_store.sstable(&table0, &mut stats).await.unwrap(),
136                sstable_store.clone(),
137                read_options.clone(),
138                &table0,
139            ),
140            SstableIterator::create(
141                sstable_store.sstable(&table1, &mut stats).await.unwrap(),
142                sstable_store.clone(),
143                read_options.clone(),
144                &table1,
145            ),
146        ]);
147
148        iter.rewind().await.unwrap();
149        let mut count = 0;
150        while iter.is_valid() {
151            count += 1;
152            iter.next().await.unwrap();
153        }
154        assert_eq!(count, TEST_KEYS_COUNT * 2);
155
156        iter.rewind().await.unwrap();
157        let mut count = 0;
158        while iter.is_valid() {
159            count += 1;
160            iter.next().await.unwrap();
161        }
162        assert_eq!(count, TEST_KEYS_COUNT * 2);
163    }
164
165    struct CancellationTestIterator {}
166
167    impl HummockIterator for CancellationTestIterator {
168        type Direction = Forward;
169
170        async fn next(&mut self) -> HummockResult<()> {
171            pending::<HummockResult<()>>().await
172        }
173
174        fn key(&self) -> FullKey<&[u8]> {
175            FullKey {
176                user_key: UserKey {
177                    table_id: Default::default(),
178                    table_key: TableKey(&b"test_key"[..]),
179                },
180                epoch_with_gap: EpochWithGap::new_from_epoch(0),
181            }
182        }
183
184        fn value(&self) -> HummockValue<&[u8]> {
185            HummockValue::delete()
186        }
187
188        fn is_valid(&self) -> bool {
189            true
190        }
191
192        async fn rewind(&mut self) -> HummockResult<()> {
193            Ok(())
194        }
195
196        async fn seek<'a>(&'a mut self, _key: FullKey<&'a [u8]>) -> HummockResult<()> {
197            Ok(())
198        }
199
200        fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}
201
202        fn value_meta(&self) -> ValueMeta {
203            ValueMeta::default()
204        }
205    }
206
207    #[tokio::test]
208    async fn test_merge_iter_cancel() {
209        let mut merge_iter = MergeIterator::new(vec![
210            MergeIterator::new(once(CancellationTestIterator {})),
211            MergeIterator::new(once(CancellationTestIterator {})),
212        ]);
213        merge_iter.rewind().await.unwrap();
214        let future = merge_iter.next();
215
216        pin_mut!(future);
217
218        for _ in 0..10 {
219            assert!(
220                poll_fn(|cx| { Poll::Ready(future.poll_unpin(cx)) })
221                    .await
222                    .is_pending()
223            );
224        }
225
226        // Dropping the future will panic if the OrderedMergeIterator is not cancellation safe.
227        // See https://github.com/risingwavelabs/risingwave/issues/6637
228    }
229}