1use std::ops::Bound::*;
16
17use more_asserts::debug_assert_ge;
18use risingwave_common::must_match;
19use risingwave_common::util::epoch::MAX_SPILL_TIMES;
20use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange};
21use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
22
23use crate::hummock::HummockResult;
24use crate::hummock::iterator::{Forward, HummockIterator};
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::value::HummockValue;
27use crate::monitor::StoreLocalStatistic;
28
29pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
31 iterator: I,
33
34 full_key_tracker: FullKeyTracker<Vec<u8>, true>,
36
37 key_range: UserKeyRange,
39
40 read_epoch: HummockEpoch,
42
43 min_epoch: HummockEpoch,
45
46 _version: Option<PinnedVersion>,
48
49 stats: StoreLocalStatistic,
50
51 is_current_pos_valid: bool,
53}
54
55impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
57 pub(crate) fn new(
59 iterator: I,
60 key_range: UserKeyRange,
61 read_epoch: u64,
62 min_epoch: u64,
63 version: Option<PinnedVersion>,
64 ) -> Self {
65 Self {
66 iterator,
67 key_range,
68 read_epoch,
69 min_epoch,
70 stats: StoreLocalStatistic::default(),
71 _version: version,
72 is_current_pos_valid: false,
73 full_key_tracker: FullKeyTracker::new(FullKey::default()),
74 }
75 }
76
77 pub fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
79 let read_epoch = HummockEpoch::MAX;
80 Self::new(iterator, key_range, read_epoch, 0, None)
81 }
82
83 pub async fn next(&mut self) -> HummockResult<()> {
91 self.is_current_pos_valid = false;
93 self.iterator.next().await?;
95
96 self.try_advance_to_next_valid().await
98 }
99
100 pub fn key(&self) -> FullKey<&[u8]> {
108 assert!(self.is_valid());
109 self.full_key_tracker.latest_full_key.to_ref()
110 }
111
112 pub fn value(&self) -> &[u8] {
116 assert!(self.is_valid());
117 must_match!(self.iterator.value(), HummockValue::Put(val) => val)
118 }
119
120 pub async fn rewind(&mut self) -> HummockResult<()> {
122 self.is_current_pos_valid = false;
124 self.full_key_tracker = FullKeyTracker::new(FullKey::default());
125
126 match &self.key_range.0 {
128 Included(begin_key) | Excluded(begin_key) => {
129 let full_key = FullKey {
130 user_key: begin_key.as_ref(),
131 epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
132 };
133 self.iterator.seek(full_key).await?;
134 }
135 Unbounded => {
136 self.iterator.rewind().await?;
137 }
138 };
139
140 self.try_advance_to_next_valid().await?;
141 if let Excluded(begin_key) = &self.key_range.0
142 && self.is_valid()
143 && self.key().user_key == begin_key.as_ref()
144 {
145 self.next().await?;
146 }
147 Ok(())
148 }
149
150 pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
152 self.is_current_pos_valid = false;
154 self.full_key_tracker = FullKeyTracker::new(FullKey::default());
155
156 let seek_key = match &self.key_range.0 {
158 Included(begin_key) | Excluded(begin_key) => {
159 let begin_key = begin_key.as_ref();
160 if begin_key > user_key {
161 begin_key
162 } else {
163 user_key
164 }
165 }
166 Unbounded => user_key,
167 };
168
169 let full_key = FullKey {
170 user_key: seek_key,
171 epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
172 };
173 self.iterator.seek(full_key).await?;
174
175 self.try_advance_to_next_valid().await?;
176 if let Excluded(begin_key) = &self.key_range.0
177 && self.is_valid()
178 && self.key().user_key == begin_key.as_ref()
179 {
180 debug_assert_ge!(begin_key.as_ref(), user_key);
181 self.next().await?;
182 }
183 Ok(())
184 }
185
186 pub fn is_valid(&self) -> bool {
188 self.is_current_pos_valid
189 }
190
191 pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
192 stats.add(&self.stats);
193 self.iterator.collect_local_statistic(stats);
194 }
195
196 async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
199 loop {
200 if !self.iterator.is_valid() {
201 break;
202 }
203
204 let full_key = self.iterator.key();
205 let epoch = full_key.epoch_with_gap.pure_epoch();
206
207 if epoch < self.min_epoch || epoch > self.read_epoch {
209 self.iterator.next().await?;
210 continue;
211 }
212
213 if !self.full_key_tracker.observe(full_key) {
215 self.stats.skip_multi_version_key_count += 1;
216 self.iterator.next().await?;
217 continue;
218 }
219
220 if self.user_key_out_of_range(full_key.user_key) {
223 break;
224 }
225
226 match self.iterator.value() {
228 HummockValue::Put(_val) => {
229 self.stats.processed_key_count += 1;
230 self.is_current_pos_valid = true;
231 return Ok(());
232 }
233 HummockValue::Delete => {
237 self.stats.skip_delete_key_count += 1;
238 }
239 }
240 self.iterator.next().await?;
241 }
242
243 self.is_current_pos_valid = false;
244 Ok(())
245 }
246
247 fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
249 match &self.key_range.1 {
251 Included(end_key) => user_key > end_key.as_ref(),
252 Excluded(end_key) => user_key >= end_key.as_ref(),
253 Unbounded => false,
254 }
255 }
256}
257
258#[cfg(test)]
259impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
260 pub(crate) fn for_test_with_epoch(
261 iterator: I,
262 key_range: UserKeyRange,
263 read_epoch: u64,
264 min_epoch: u64,
265 ) -> Self {
266 Self::new(iterator, key_range, read_epoch, min_epoch, None)
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use std::sync::Arc;
273
274 use bytes::Bytes;
275 use risingwave_common::util::epoch::test_epoch;
276 use risingwave_hummock_sdk::sstable_info::SstableInfo;
277
278 use super::*;
279 use crate::hummock::TableHolder;
280 use crate::hummock::iterator::MergeIterator;
281 use crate::hummock::iterator::test_utils::{
282 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_base,
283 gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
284 gen_iterator_test_sstable_with_range_tombstones, iterator_test_bytes_key_of,
285 iterator_test_bytes_key_of_epoch, iterator_test_bytes_user_key_of, iterator_test_value_of,
286 mock_sstable_store,
287 };
288 use crate::hummock::sstable::{
289 SstableIterator, SstableIteratorReadOptions, SstableIteratorType,
290 };
291 use crate::hummock::sstable_store::SstableStoreRef;
292
293 #[tokio::test]
294 async fn test_basic() {
295 let sstable_store = mock_sstable_store().await;
296 let read_options = Arc::new(SstableIteratorReadOptions::default());
297 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
298 0,
299 default_builder_opt_for_test(),
300 |x| x * 3,
301 sstable_store.clone(),
302 TEST_KEYS_COUNT,
303 )
304 .await;
305 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
306 1,
307 default_builder_opt_for_test(),
308 |x| x * 3 + 1,
309 sstable_store.clone(),
310 TEST_KEYS_COUNT,
311 )
312 .await;
313 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
314 2,
315 default_builder_opt_for_test(),
316 |x| x * 3 + 2,
317 sstable_store.clone(),
318 TEST_KEYS_COUNT,
319 )
320 .await;
321 let iters = vec![
322 SstableIterator::create(
323 table0,
324 sstable_store.clone(),
325 read_options.clone(),
326 &sstable_info_0,
327 ),
328 SstableIterator::create(
329 table1,
330 sstable_store.clone(),
331 read_options.clone(),
332 &sstable_info_1,
333 ),
334 SstableIterator::create(table2, sstable_store, read_options.clone(), &sstable_info_2),
335 ];
336
337 let mi = MergeIterator::new(iters);
338 let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
339 ui.rewind().await.unwrap();
340
341 let mut i = 0;
342 while ui.is_valid() {
343 let key = ui.key();
344 let val = ui.value();
345 assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
346 assert_eq!(val, iterator_test_value_of(i).as_slice());
347 i += 1;
348 ui.next().await.unwrap();
349 if i == TEST_KEYS_COUNT * 3 {
350 assert!(!ui.is_valid());
351 break;
352 }
353 }
354 assert!(i >= TEST_KEYS_COUNT * 3);
355 }
356
357 #[tokio::test]
358 async fn test_seek() {
359 let sstable_store = mock_sstable_store().await;
360 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
361 0,
362 default_builder_opt_for_test(),
363 |x| x * 3,
364 sstable_store.clone(),
365 TEST_KEYS_COUNT,
366 )
367 .await;
368 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
369 1,
370 default_builder_opt_for_test(),
371 |x| x * 3 + 1,
372 sstable_store.clone(),
373 TEST_KEYS_COUNT,
374 )
375 .await;
376 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
377 2,
378 default_builder_opt_for_test(),
379 |x| x * 3 + 2,
380 sstable_store.clone(),
381 TEST_KEYS_COUNT,
382 )
383 .await;
384 let read_options = Arc::new(SstableIteratorReadOptions::default());
385 let iters = vec![
386 SstableIterator::create(
387 table0,
388 sstable_store.clone(),
389 read_options.clone(),
390 &sstable_info_0,
391 ),
392 SstableIterator::create(
393 table1,
394 sstable_store.clone(),
395 read_options.clone(),
396 &sstable_info_1,
397 ),
398 SstableIterator::create(table2, sstable_store, read_options, &sstable_info_2),
399 ];
400
401 let mi = MergeIterator::new(iters);
402 let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
403
404 ui.seek(iterator_test_bytes_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
406 .await
407 .unwrap();
408 assert!(!ui.is_valid());
409
410 ui.seek(iterator_test_bytes_user_key_of(TEST_KEYS_COUNT + 5).as_ref())
412 .await
413 .unwrap();
414 let k = ui.key();
415 let v = ui.value();
416 assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 5).as_slice());
417 assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 5).to_ref());
418 ui.seek(iterator_test_bytes_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
419 .await
420 .unwrap();
421 let k = ui.key();
422 let v = ui.value();
423 assert_eq!(
424 v,
425 iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
426 );
427 assert_eq!(
428 k,
429 iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
430 );
431
432 ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
434 .await
435 .unwrap();
436 let k = ui.key();
437 let v = ui.value();
438 assert_eq!(v, iterator_test_value_of(0).as_slice());
439 assert_eq!(k, iterator_test_bytes_key_of(0).to_ref());
440 }
441
442 #[tokio::test]
443 async fn test_delete() {
444 let sstable_store = mock_sstable_store().await;
445
446 let kv_pairs = vec![
448 (1, 100, HummockValue::put(iterator_test_value_of(1))),
449 (2, 300, HummockValue::delete()),
450 ];
451 let (table0, sstable_info_0) =
452 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
453
454 let kv_pairs = vec![
455 (1, 200, HummockValue::delete()),
456 (2, 400, HummockValue::put(iterator_test_value_of(2))),
457 ];
458 let (table1, sstable_info_1) =
459 gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
460
461 let read_options = Arc::new(SstableIteratorReadOptions::default());
462 let iters = vec![
463 SstableIterator::create(
464 table0,
465 sstable_store.clone(),
466 read_options.clone(),
467 &sstable_info_0,
468 ),
469 SstableIterator::create(table1, sstable_store.clone(), read_options, &sstable_info_1),
470 ];
471
472 let mi = MergeIterator::new(iters);
473 let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
474 ui.rewind().await.unwrap();
475
476 let k = ui.key();
478 let v = ui.value();
479 assert_eq!(k, iterator_test_bytes_key_of_epoch(2, 400).to_ref());
480 assert_eq!(v, &Bytes::from(iterator_test_value_of(2)));
481
482 ui.next().await.unwrap();
484 assert!(!ui.is_valid());
485 }
486
487 async fn generate_test_data(sstable_store: SstableStoreRef) -> (TableHolder, SstableInfo) {
488 let kv_pairs = vec![
489 (0, 200, HummockValue::delete()),
490 (0, 100, HummockValue::put(iterator_test_value_of(0))),
491 (1, 200, HummockValue::put(iterator_test_value_of(1))),
492 (1, 100, HummockValue::delete()),
493 (2, 300, HummockValue::put(iterator_test_value_of(2))),
494 (2, 200, HummockValue::delete()),
495 (2, 100, HummockValue::delete()),
496 (3, 100, HummockValue::put(iterator_test_value_of(3))),
497 (5, 200, HummockValue::delete()),
498 (5, 100, HummockValue::put(iterator_test_value_of(5))),
499 (6, 100, HummockValue::put(iterator_test_value_of(6))),
500 (7, 200, HummockValue::delete()),
501 (7, 100, HummockValue::put(iterator_test_value_of(7))),
502 (8, 100, HummockValue::put(iterator_test_value_of(8))),
503 ];
504 let sst_info =
505 gen_iterator_test_sstable_with_range_tombstones(0, kv_pairs, sstable_store.clone())
506 .await;
507 (
508 sstable_store
509 .sstable(&sst_info, &mut StoreLocalStatistic::default())
510 .await
511 .unwrap(),
512 sst_info,
513 )
514 }
515
516 #[tokio::test]
518 async fn test_range_inclusive() {
519 let sstable_store = mock_sstable_store().await;
520 let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
522 let read_options = Arc::new(SstableIteratorReadOptions::default());
523 let iters = vec![SstableIterator::create(
524 table,
525 sstable_store,
526 read_options,
527 &sstable_info,
528 )];
529 let mi = MergeIterator::new(iters);
530
531 let begin_key = Included(iterator_test_bytes_user_key_of(2));
532 let end_key = Included(iterator_test_bytes_user_key_of(7));
533
534 let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
535
536 ui.rewind().await.unwrap();
538 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
539 ui.next().await.unwrap();
540 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
541 ui.next().await.unwrap();
542 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
543 ui.next().await.unwrap();
544 assert!(!ui.is_valid());
545
546 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
548 .await
549 .unwrap();
550 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
551 ui.next().await.unwrap();
552 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
553 ui.next().await.unwrap();
554 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
555 ui.next().await.unwrap();
556 assert!(!ui.is_valid());
557
558 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
560 .await
561 .unwrap();
562 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
563 ui.next().await.unwrap();
564 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
565 ui.next().await.unwrap();
566 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
567 ui.next().await.unwrap();
568 assert!(!ui.is_valid());
569
570 ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
572 .await
573 .unwrap();
574 assert!(!ui.is_valid());
575
576 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
578 .await
579 .unwrap();
580 assert!(!ui.is_valid());
581 }
582
583 #[tokio::test]
585 async fn test_range() {
586 let sstable_store = mock_sstable_store().await;
587 let kv_pairs = vec![
589 (0, 200, HummockValue::delete()),
590 (0, 100, HummockValue::put(iterator_test_value_of(0))),
591 (1, 200, HummockValue::put(iterator_test_value_of(1))),
592 (1, 100, HummockValue::delete()),
593 (2, 300, HummockValue::put(iterator_test_value_of(2))),
594 (2, 200, HummockValue::delete()),
595 (2, 100, HummockValue::delete()),
596 (3, 100, HummockValue::put(iterator_test_value_of(3))),
597 (5, 200, HummockValue::delete()),
598 (5, 100, HummockValue::put(iterator_test_value_of(5))),
599 (6, 100, HummockValue::put(iterator_test_value_of(6))),
600 (7, 100, HummockValue::put(iterator_test_value_of(7))),
601 (8, 100, HummockValue::put(iterator_test_value_of(8))),
602 ];
603 let (table, sstable_info) =
604 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
605 let read_options = Arc::new(SstableIteratorReadOptions::default());
606 let iters = vec![SstableIterator::create(
607 table.clone(),
608 sstable_store.clone(),
609 read_options.clone(),
610 &sstable_info,
611 )];
612 let mi = MergeIterator::new(iters);
613
614 let begin_key = Included(iterator_test_bytes_user_key_of(2));
615 let end_key = Excluded(iterator_test_bytes_user_key_of(7));
616
617 let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
618
619 ui.rewind().await.unwrap();
621 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
622 ui.next().await.unwrap();
623 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
624 ui.next().await.unwrap();
625 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
626 ui.next().await.unwrap();
627 assert!(!ui.is_valid());
628
629 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
631 .await
632 .unwrap();
633 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
634 ui.next().await.unwrap();
635 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
636 ui.next().await.unwrap();
637 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
638 ui.next().await.unwrap();
639 assert!(!ui.is_valid());
640
641 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
643 .await
644 .unwrap();
645 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
646 ui.next().await.unwrap();
647 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
648 ui.next().await.unwrap();
649 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
650 ui.next().await.unwrap();
651 assert!(!ui.is_valid());
652
653 ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
655 .await
656 .unwrap();
657 assert!(!ui.is_valid());
658
659 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
661 .await
662 .unwrap();
663 assert!(!ui.is_valid());
664
665 let iters = vec![SstableIterator::create(
666 table,
667 sstable_store,
668 read_options,
669 &sstable_info,
670 )];
671 let mi = MergeIterator::new(iters);
672
673 let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
674 let end_key = Excluded(iterator_test_bytes_user_key_of(7));
675
676 let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
677 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
679 .await
680 .unwrap();
681 assert!(ui.is_valid());
682 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
683 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
684 .await
685 .unwrap();
686 assert!(ui.is_valid());
687 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
688 ui.seek(iterator_test_bytes_user_key_of(3).as_ref())
689 .await
690 .unwrap();
691 assert!(ui.is_valid());
692 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
693 ui.seek(iterator_test_bytes_user_key_of(4).as_ref())
694 .await
695 .unwrap();
696 assert!(ui.is_valid());
697 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
698 }
699
700 #[tokio::test]
702 async fn test_range_to_inclusive() {
703 let sstable_store = mock_sstable_store().await;
704 let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
707 let read_options = Arc::new(SstableIteratorReadOptions::default());
708 let iters = vec![SstableIterator::create(
709 table,
710 sstable_store,
711 read_options,
712 &sstable_info,
713 )];
714 let mi = MergeIterator::new(iters);
715 let end_key = Included(iterator_test_bytes_user_key_of(7));
716
717 let mut ui = UserIterator::for_test(mi, (Unbounded, end_key));
718
719 ui.rewind().await.unwrap();
721 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
722 ui.next().await.unwrap();
723 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
724 ui.next().await.unwrap();
725 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
726 ui.next().await.unwrap();
727 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
728 ui.next().await.unwrap();
729 assert!(!ui.is_valid());
730
731 ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
733 .await
734 .unwrap();
735 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
736 ui.next().await.unwrap();
737 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
738 ui.next().await.unwrap();
739 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
740 ui.next().await.unwrap();
741 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
742 ui.next().await.unwrap();
743 assert!(!ui.is_valid());
744
745 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
747 .await
748 .unwrap();
749 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
750 ui.next().await.unwrap();
751 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
752 ui.next().await.unwrap();
753 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
754 ui.next().await.unwrap();
755 assert!(!ui.is_valid());
756
757 ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
759 .await
760 .unwrap();
761 assert!(!ui.is_valid());
762
763 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
765 .await
766 .unwrap();
767 assert!(!ui.is_valid());
768 }
769
770 #[tokio::test]
772 async fn test_range_from() {
773 let sstable_store = mock_sstable_store().await;
774 let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
776 let read_options = Arc::new(SstableIteratorReadOptions::default());
777 let iters = vec![SstableIterator::create(
778 table,
779 sstable_store,
780 read_options,
781 &sstable_info,
782 )];
783 let mi = MergeIterator::new(iters);
784 let begin_key = Included(iterator_test_bytes_user_key_of(2));
785
786 let mut ui = UserIterator::for_test(mi, (begin_key, Unbounded));
787
788 ui.rewind().await.unwrap();
790 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
791 ui.next().await.unwrap();
792 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
793 ui.next().await.unwrap();
794 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
795 ui.next().await.unwrap();
796 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
797 ui.next().await.unwrap();
798 assert!(!ui.is_valid());
799
800 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
802 .await
803 .unwrap();
804 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
805 ui.next().await.unwrap();
806 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
807 ui.next().await.unwrap();
808 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
809 ui.next().await.unwrap();
810 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
811 ui.next().await.unwrap();
812 assert!(!ui.is_valid());
813
814 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
816 .await
817 .unwrap();
818 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
819 ui.next().await.unwrap();
820 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
821 ui.next().await.unwrap();
822 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
823 ui.next().await.unwrap();
824 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
825 ui.next().await.unwrap();
826 assert!(!ui.is_valid());
827
828 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
830 .await
831 .unwrap();
832 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
833 ui.next().await.unwrap();
834 assert!(!ui.is_valid());
835
836 ui.seek(iterator_test_bytes_user_key_of(9).as_ref())
838 .await
839 .unwrap();
840 assert!(!ui.is_valid());
841 }
842
843 #[tokio::test]
844 async fn test_min_epoch() {
845 let sstable_store = mock_sstable_store().await;
846 let read_options = Arc::new(SstableIteratorReadOptions::default());
847 let (table0, sstable_info_0) = gen_iterator_test_sstable_with_incr_epoch(
848 0,
849 default_builder_opt_for_test(),
850 |x| x * 3,
851 sstable_store.clone(),
852 TEST_KEYS_COUNT,
853 1,
854 )
855 .await;
856 let iters = vec![SstableIterator::create(
857 table0,
858 sstable_store.clone(),
859 read_options.clone(),
860 &sstable_info_0,
861 )];
862
863 let min_count = (TEST_KEYS_COUNT / 5) as u64;
864 let min_epoch = test_epoch(min_count);
865 let mi = MergeIterator::new(iters);
866 let mut ui =
867 UserIterator::for_test_with_epoch(mi, (Unbounded, Unbounded), u64::MAX, min_epoch);
868 ui.rewind().await.unwrap();
869
870 let mut i = 0;
871 while ui.is_valid() {
872 let key = ui.key();
873 let key_epoch = key.epoch_with_gap.pure_epoch();
874 assert!(key_epoch >= min_epoch);
875
876 i += 1;
877 ui.next().await.unwrap();
878 }
879
880 let expect_count = TEST_KEYS_COUNT - (min_epoch / test_epoch(1)) as usize + 1;
881 assert_eq!(i, expect_count);
882 }
883}