risingwave_storage/hummock/iterator/
forward_merge.rs1#[cfg(test)]
16mod test {
17 use std::future::{pending, poll_fn};
18 use std::iter::once;
19 use std::sync::Arc;
20 use std::task::Poll;
21
22 use futures::{FutureExt, pin_mut};
23 use risingwave_hummock_sdk::EpochWithGap;
24 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
25
26 use crate::hummock::HummockResult;
27 use crate::hummock::iterator::test_utils::{
28 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_info,
29 gen_merge_iterator_interleave_test_sstable_iters, iterator_test_key_of,
30 iterator_test_value_of, mock_sstable_store,
31 };
32 use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, ValueMeta};
33 use crate::hummock::sstable::{
34 SstableIterator, SstableIteratorReadOptions, SstableIteratorType,
35 };
36 use crate::hummock::value::HummockValue;
37 use crate::monitor::StoreLocalStatistic;
38
39 #[tokio::test]
40 async fn test_merge_basic() {
41 let mut iter = MergeIterator::new(
42 gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await,
43 );
44
45 let mut i = 0;
47 iter.rewind().await.unwrap();
48 while iter.is_valid() {
49 let key = iter.key();
50 let val = iter.value();
51 assert_eq!(key, iterator_test_key_of(i).to_ref());
52 assert_eq!(
53 val.into_user_value().unwrap(),
54 iterator_test_value_of(i).as_slice()
55 );
56 i += 1;
57 iter.next().await.unwrap();
58 if i == TEST_KEYS_COUNT * 3 {
59 assert!(!iter.is_valid());
60 break;
61 }
62 }
63 assert!(i >= TEST_KEYS_COUNT * 3);
64 }
65
66 #[tokio::test]
67 async fn test_merge_seek() {
68 let mut iter = MergeIterator::new(
69 gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await,
70 );
71
72 iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 3).to_ref())
75 .await
76 .unwrap();
77 assert!(!iter.is_valid());
78
79 iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref())
81 .await
82 .unwrap();
83 let k = iter.key();
84 let v = iter.value();
85 assert_eq!(
86 v.into_user_value().unwrap(),
87 iterator_test_value_of(TEST_KEYS_COUNT * 2 + 5).as_slice()
88 );
89 assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref());
90
91 iter.seek(iterator_test_key_of(17).to_ref()).await.unwrap();
92 let k = iter.key();
93 let v = iter.value();
94 assert_eq!(
95 v.into_user_value().unwrap(),
96 iterator_test_value_of(TEST_KEYS_COUNT + 7).as_slice()
97 );
98 assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT + 7).to_ref());
99
100 iter.seek(iterator_test_key_of(0).to_ref()).await.unwrap();
102 let k = iter.key();
103 let v = iter.value();
104 assert_eq!(
105 v.into_user_value().unwrap(),
106 iterator_test_value_of(0).as_slice()
107 );
108 assert_eq!(k, iterator_test_key_of(0).to_ref());
109 }
110
111 #[tokio::test]
112 async fn test_merge_invalidate_reset() {
113 let sstable_store = mock_sstable_store().await;
114 let read_options = Arc::new(SstableIteratorReadOptions::default());
115 let table0 = gen_iterator_test_sstable_info(
116 0,
117 default_builder_opt_for_test(),
118 |x| x,
119 sstable_store.clone(),
120 TEST_KEYS_COUNT,
121 )
122 .await;
123 let table1 = gen_iterator_test_sstable_info(
124 1,
125 default_builder_opt_for_test(),
126 |x| TEST_KEYS_COUNT + x,
127 sstable_store.clone(),
128 TEST_KEYS_COUNT,
129 )
130 .await;
131
132 let mut stats = StoreLocalStatistic::default();
133 let mut iter = MergeIterator::new(vec![
134 SstableIterator::create(
135 sstable_store.sstable(&table0, &mut stats).await.unwrap(),
136 sstable_store.clone(),
137 read_options.clone(),
138 &table0,
139 ),
140 SstableIterator::create(
141 sstable_store.sstable(&table1, &mut stats).await.unwrap(),
142 sstable_store.clone(),
143 read_options.clone(),
144 &table1,
145 ),
146 ]);
147
148 iter.rewind().await.unwrap();
149 let mut count = 0;
150 while iter.is_valid() {
151 count += 1;
152 iter.next().await.unwrap();
153 }
154 assert_eq!(count, TEST_KEYS_COUNT * 2);
155
156 iter.rewind().await.unwrap();
157 let mut count = 0;
158 while iter.is_valid() {
159 count += 1;
160 iter.next().await.unwrap();
161 }
162 assert_eq!(count, TEST_KEYS_COUNT * 2);
163 }
164
165 struct CancellationTestIterator {}
166
167 impl HummockIterator for CancellationTestIterator {
168 type Direction = Forward;
169
170 async fn next(&mut self) -> HummockResult<()> {
171 pending::<HummockResult<()>>().await
172 }
173
174 fn key(&self) -> FullKey<&[u8]> {
175 FullKey {
176 user_key: UserKey {
177 table_id: Default::default(),
178 table_key: TableKey(&b"test_key"[..]),
179 },
180 epoch_with_gap: EpochWithGap::new_from_epoch(0),
181 }
182 }
183
184 fn value(&self) -> HummockValue<&[u8]> {
185 HummockValue::delete()
186 }
187
188 fn is_valid(&self) -> bool {
189 true
190 }
191
192 async fn rewind(&mut self) -> HummockResult<()> {
193 Ok(())
194 }
195
196 async fn seek<'a>(&'a mut self, _key: FullKey<&'a [u8]>) -> HummockResult<()> {
197 Ok(())
198 }
199
200 fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}
201
202 fn value_meta(&self) -> ValueMeta {
203 ValueMeta::default()
204 }
205 }
206
207 #[tokio::test]
208 async fn test_merge_iter_cancel() {
209 let mut merge_iter = MergeIterator::new(vec![
210 MergeIterator::new(once(CancellationTestIterator {})),
211 MergeIterator::new(once(CancellationTestIterator {})),
212 ]);
213 merge_iter.rewind().await.unwrap();
214 let future = merge_iter.next();
215
216 pin_mut!(future);
217
218 for _ in 0..10 {
219 assert!(
220 poll_fn(|cx| { Poll::Ready(future.poll_unpin(cx)) })
221 .await
222 .is_pending()
223 );
224 }
225
226 }
229}