risingwave_storage/hummock/iterator/
forward_concat.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::hummock::SstableIterator;
16use crate::hummock::iterator::concat_inner::ConcatIteratorInner;
17
18/// Iterates on multiple non-overlapping tables.
19pub type ConcatIterator = ConcatIteratorInner<SstableIterator>;
20
21#[cfg(test)]
22mod tests {
23    use std::sync::Arc;
24
25    use super::*;
26    use crate::hummock::iterator::HummockIterator;
27    use crate::hummock::iterator::test_utils::{
28        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_info,
29        iterator_test_key_of, iterator_test_value_of, mock_sstable_store,
30    };
31    use crate::hummock::sstable::SstableIteratorReadOptions;
32
33    #[tokio::test]
34    async fn test_concat_iterator() {
35        let sstable_store = mock_sstable_store().await;
36        let table0 = gen_iterator_test_sstable_info(
37            0,
38            default_builder_opt_for_test(),
39            |x| x,
40            sstable_store.clone(),
41            TEST_KEYS_COUNT,
42        )
43        .await;
44        let table1 = gen_iterator_test_sstable_info(
45            1,
46            default_builder_opt_for_test(),
47            |x| TEST_KEYS_COUNT + x,
48            sstable_store.clone(),
49            TEST_KEYS_COUNT,
50        )
51        .await;
52        let table2 = gen_iterator_test_sstable_info(
53            2,
54            default_builder_opt_for_test(),
55            |x| TEST_KEYS_COUNT * 2 + x,
56            sstable_store.clone(),
57            TEST_KEYS_COUNT,
58        )
59        .await;
60        let mut iter = ConcatIterator::new(
61            vec![table0, table1, table2],
62            sstable_store,
63            Arc::new(SstableIteratorReadOptions::default()),
64        );
65        let mut i = 0;
66        iter.rewind().await.unwrap();
67
68        while iter.is_valid() {
69            let key = iter.key();
70            let val = iter.value();
71            assert_eq!(key, iterator_test_key_of(i).to_ref());
72            assert_eq!(
73                val.into_user_value().unwrap(),
74                iterator_test_value_of(i).as_slice()
75            );
76            i += 1;
77            iter.next().await.unwrap();
78            if i == TEST_KEYS_COUNT * 3 {
79                assert!(!iter.is_valid());
80                break;
81            }
82        }
83
84        iter.rewind().await.unwrap();
85        let key = iter.key();
86        let val = iter.value();
87        assert_eq!(key, iterator_test_key_of(0).to_ref());
88        assert_eq!(
89            val.into_user_value().unwrap(),
90            iterator_test_value_of(0).as_slice()
91        );
92    }
93
94    #[tokio::test]
95    async fn test_concat_seek() {
96        let sstable_store = mock_sstable_store().await;
97        let table0 = gen_iterator_test_sstable_info(
98            0,
99            default_builder_opt_for_test(),
100            |x| x,
101            sstable_store.clone(),
102            TEST_KEYS_COUNT,
103        )
104        .await;
105        let table1 = gen_iterator_test_sstable_info(
106            1,
107            default_builder_opt_for_test(),
108            |x| TEST_KEYS_COUNT + x,
109            sstable_store.clone(),
110            TEST_KEYS_COUNT,
111        )
112        .await;
113        let table2 = gen_iterator_test_sstable_info(
114            2,
115            default_builder_opt_for_test(),
116            |x| TEST_KEYS_COUNT * 2 + x,
117            sstable_store.clone(),
118            TEST_KEYS_COUNT,
119        )
120        .await;
121        let mut iter = ConcatIterator::new(
122            vec![table0, table1, table2],
123            sstable_store,
124            Arc::new(SstableIteratorReadOptions::default()),
125        );
126
127        iter.seek(iterator_test_key_of(TEST_KEYS_COUNT + 1).to_ref())
128            .await
129            .unwrap();
130
131        let key = iter.key();
132        let val = iter.value();
133        assert_eq!(key, iterator_test_key_of(TEST_KEYS_COUNT + 1).to_ref());
134        assert_eq!(
135            val.into_user_value().unwrap(),
136            iterator_test_value_of(TEST_KEYS_COUNT + 1).as_slice()
137        );
138
139        // Left edge case
140        iter.seek(iterator_test_key_of(0).to_ref()).await.unwrap();
141        let key = iter.key();
142        let val = iter.value();
143        assert_eq!(key, iterator_test_key_of(0).to_ref());
144        assert_eq!(
145            val.into_user_value().unwrap(),
146            iterator_test_value_of(0).as_slice()
147        );
148
149        // Right edge case
150        iter.seek(iterator_test_key_of(3 * TEST_KEYS_COUNT - 1).to_ref())
151            .await
152            .unwrap();
153
154        let key = iter.key();
155        let val = iter.value();
156        assert_eq!(key, iterator_test_key_of(3 * TEST_KEYS_COUNT - 1).to_ref());
157        assert_eq!(
158            val.into_user_value().unwrap(),
159            iterator_test_value_of(3 * TEST_KEYS_COUNT - 1).as_slice()
160        );
161
162        // Right overflow case
163        iter.seek(iterator_test_key_of(3 * TEST_KEYS_COUNT).to_ref())
164            .await
165            .unwrap();
166        assert!(!iter.is_valid());
167    }
168
169    #[tokio::test]
170    async fn test_concat_seek_not_exists() {
171        let sstable_store = mock_sstable_store().await;
172        let table0 = gen_iterator_test_sstable_info(
173            0,
174            default_builder_opt_for_test(),
175            |x| x * 2,
176            sstable_store.clone(),
177            TEST_KEYS_COUNT,
178        )
179        .await;
180        let table1 = gen_iterator_test_sstable_info(
181            1,
182            default_builder_opt_for_test(),
183            |x| (TEST_KEYS_COUNT + x) * 2,
184            sstable_store.clone(),
185            TEST_KEYS_COUNT,
186        )
187        .await;
188        let table2 = gen_iterator_test_sstable_info(
189            2,
190            default_builder_opt_for_test(),
191            |x| (2 * TEST_KEYS_COUNT + x) * 2,
192            sstable_store.clone(),
193            TEST_KEYS_COUNT,
194        )
195        .await;
196        let mut iter = ConcatIterator::new(
197            vec![table0, table1, table2],
198            sstable_store,
199            Arc::new(SstableIteratorReadOptions::default()),
200        );
201
202        iter.seek(iterator_test_key_of(TEST_KEYS_COUNT + 1).to_ref())
203            .await
204            .unwrap();
205
206        let key = iter.key();
207        let val = iter.value();
208        assert_eq!(key, iterator_test_key_of(TEST_KEYS_COUNT + 2).to_ref());
209        assert_eq!(
210            val.into_user_value().unwrap(),
211            iterator_test_value_of(TEST_KEYS_COUNT + 2).as_slice()
212        );
213
214        // seek the last of table1
215        iter.seek(iterator_test_key_of((TEST_KEYS_COUNT + 9) * 2 + 1).to_ref())
216            .await
217            .unwrap();
218
219        let key = iter.key();
220        let val = iter.value();
221        assert_eq!(key, iterator_test_key_of(TEST_KEYS_COUNT * 4).to_ref());
222        assert_eq!(
223            val.into_user_value().unwrap(),
224            iterator_test_value_of(TEST_KEYS_COUNT * 4).as_slice()
225        );
226    }
227}