1use std::ops::Bound::*;
16
17use more_asserts::debug_assert_ge;
18use risingwave_common::must_match;
19use risingwave_common::util::epoch::MAX_SPILL_TIMES;
20use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange};
21use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
22
23use crate::hummock::HummockResult;
24use crate::hummock::iterator::{Forward, HummockIterator};
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::value::HummockValue;
27use crate::monitor::StoreLocalStatistic;
28
29pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
31 iterator: I,
33
34 full_key_tracker: FullKeyTracker<Vec<u8>, true>,
36
37 key_range: UserKeyRange,
39
40 read_epoch: HummockEpoch,
42
43 min_epoch: HummockEpoch,
45
46 _version: Option<PinnedVersion>,
48
49 stats: StoreLocalStatistic,
50
51 is_current_pos_valid: bool,
53}
54
55impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
57 pub(crate) fn new(
59 iterator: I,
60 key_range: UserKeyRange,
61 read_epoch: u64,
62 min_epoch: u64,
63 version: Option<PinnedVersion>,
64 ) -> Self {
65 Self {
66 iterator,
67 key_range,
68 read_epoch,
69 min_epoch,
70 stats: StoreLocalStatistic::default(),
71 _version: version,
72 is_current_pos_valid: false,
73 full_key_tracker: FullKeyTracker::new(FullKey::default()),
74 }
75 }
76
77 pub fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
79 let read_epoch = HummockEpoch::MAX;
80 Self::new(iterator, key_range, read_epoch, 0, None)
81 }
82
83 pub async fn next(&mut self) -> HummockResult<()> {
91 self.is_current_pos_valid = false;
93 self.iterator.next().await?;
95
96 self.try_advance_to_next_valid().await
98 }
99
100 pub fn key(&self) -> FullKey<&[u8]> {
108 assert!(self.is_valid());
109 self.full_key_tracker.latest_full_key.to_ref()
110 }
111
112 pub fn value(&self) -> &[u8] {
116 assert!(self.is_valid());
117 must_match!(self.iterator.value(), HummockValue::Put(val) => val)
118 }
119
120 pub async fn rewind(&mut self) -> HummockResult<()> {
122 self.is_current_pos_valid = false;
124 self.full_key_tracker = FullKeyTracker::new(FullKey::default());
125
126 match &self.key_range.0 {
128 Included(begin_key) | Excluded(begin_key) => {
129 let full_key = FullKey {
130 user_key: begin_key.as_ref(),
131 epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
132 };
133 self.iterator.seek(full_key).await?;
134 }
135 Unbounded => {
136 self.iterator.rewind().await?;
137 }
138 };
139
140 self.try_advance_to_next_valid().await?;
141 if let Excluded(begin_key) = &self.key_range.0
142 && self.is_valid()
143 && self.key().user_key == begin_key.as_ref()
144 {
145 self.next().await?;
146 }
147 Ok(())
148 }
149
150 pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
152 self.is_current_pos_valid = false;
154 self.full_key_tracker = FullKeyTracker::new(FullKey::default());
155
156 let seek_key = match &self.key_range.0 {
158 Included(begin_key) | Excluded(begin_key) => {
159 let begin_key = begin_key.as_ref();
160 if begin_key > user_key {
161 begin_key
162 } else {
163 user_key
164 }
165 }
166 Unbounded => user_key,
167 };
168
169 let full_key = FullKey {
170 user_key: seek_key,
171 epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
172 };
173 self.iterator.seek(full_key).await?;
174
175 self.try_advance_to_next_valid().await?;
176 if let Excluded(begin_key) = &self.key_range.0
177 && self.is_valid()
178 && self.key().user_key == begin_key.as_ref()
179 {
180 debug_assert_ge!(begin_key.as_ref(), user_key);
181 self.next().await?;
182 }
183 Ok(())
184 }
185
186 pub fn is_valid(&self) -> bool {
188 self.is_current_pos_valid
189 }
190
191 pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
192 stats.add(&self.stats);
193 self.iterator.collect_local_statistic(stats);
194 }
195
196 async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
199 loop {
200 if !self.iterator.is_valid() {
201 break;
202 }
203
204 let full_key = self.iterator.key();
205 let epoch = full_key.epoch_with_gap.pure_epoch();
206
207 if epoch < self.min_epoch || epoch > self.read_epoch {
209 self.iterator.next().await?;
210 continue;
211 }
212
213 if !self.full_key_tracker.observe(full_key) {
215 self.stats.skip_multi_version_key_count += 1;
216 self.iterator.next().await?;
217 continue;
218 }
219
220 if self.user_key_out_of_range(full_key.user_key) {
223 break;
224 }
225
226 match self.iterator.value() {
228 HummockValue::Put(_val) => {
229 self.stats.processed_key_count += 1;
230 self.is_current_pos_valid = true;
231 return Ok(());
232 }
233 HummockValue::Delete => {
237 self.stats.skip_delete_key_count += 1;
238 }
239 }
240 self.iterator.next().await?;
241 }
242
243 self.is_current_pos_valid = false;
244 Ok(())
245 }
246
247 fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
249 match &self.key_range.1 {
251 Included(end_key) => user_key > end_key.as_ref(),
252 Excluded(end_key) => user_key >= end_key.as_ref(),
253 Unbounded => false,
254 }
255 }
256}
257
258#[cfg(test)]
259impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
260 pub(crate) fn for_test_with_epoch(
261 iterator: I,
262 key_range: UserKeyRange,
263 read_epoch: u64,
264 min_epoch: u64,
265 ) -> Self {
266 Self::new(iterator, key_range, read_epoch, min_epoch, None)
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use std::ops::Bound::*;
273 use std::sync::Arc;
274
275 use bytes::Bytes;
276 use risingwave_common::util::epoch::test_epoch;
277 use risingwave_hummock_sdk::sstable_info::SstableInfo;
278
279 use super::*;
280 use crate::hummock::TableHolder;
281 use crate::hummock::iterator::MergeIterator;
282 use crate::hummock::iterator::test_utils::{
283 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_base,
284 gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
285 gen_iterator_test_sstable_with_range_tombstones, iterator_test_bytes_key_of,
286 iterator_test_bytes_key_of_epoch, iterator_test_bytes_user_key_of, iterator_test_value_of,
287 mock_sstable_store,
288 };
289 use crate::hummock::sstable::{
290 SstableIterator, SstableIteratorReadOptions, SstableIteratorType,
291 };
292 use crate::hummock::sstable_store::SstableStoreRef;
293
294 #[tokio::test]
295 async fn test_basic() {
296 let sstable_store = mock_sstable_store().await;
297 let read_options = Arc::new(SstableIteratorReadOptions::default());
298 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
299 0,
300 default_builder_opt_for_test(),
301 |x| x * 3,
302 sstable_store.clone(),
303 TEST_KEYS_COUNT,
304 )
305 .await;
306 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
307 1,
308 default_builder_opt_for_test(),
309 |x| x * 3 + 1,
310 sstable_store.clone(),
311 TEST_KEYS_COUNT,
312 )
313 .await;
314 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
315 2,
316 default_builder_opt_for_test(),
317 |x| x * 3 + 2,
318 sstable_store.clone(),
319 TEST_KEYS_COUNT,
320 )
321 .await;
322 let iters = vec![
323 SstableIterator::create(
324 table0,
325 sstable_store.clone(),
326 read_options.clone(),
327 &sstable_info_0,
328 ),
329 SstableIterator::create(
330 table1,
331 sstable_store.clone(),
332 read_options.clone(),
333 &sstable_info_1,
334 ),
335 SstableIterator::create(table2, sstable_store, read_options.clone(), &sstable_info_2),
336 ];
337
338 let mi = MergeIterator::new(iters);
339 let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
340 ui.rewind().await.unwrap();
341
342 let mut i = 0;
343 while ui.is_valid() {
344 let key = ui.key();
345 let val = ui.value();
346 assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
347 assert_eq!(val, iterator_test_value_of(i).as_slice());
348 i += 1;
349 ui.next().await.unwrap();
350 if i == TEST_KEYS_COUNT * 3 {
351 assert!(!ui.is_valid());
352 break;
353 }
354 }
355 assert!(i >= TEST_KEYS_COUNT * 3);
356 }
357
358 #[tokio::test]
359 async fn test_seek() {
360 let sstable_store = mock_sstable_store().await;
361 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
362 0,
363 default_builder_opt_for_test(),
364 |x| x * 3,
365 sstable_store.clone(),
366 TEST_KEYS_COUNT,
367 )
368 .await;
369 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
370 1,
371 default_builder_opt_for_test(),
372 |x| x * 3 + 1,
373 sstable_store.clone(),
374 TEST_KEYS_COUNT,
375 )
376 .await;
377 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
378 2,
379 default_builder_opt_for_test(),
380 |x| x * 3 + 2,
381 sstable_store.clone(),
382 TEST_KEYS_COUNT,
383 )
384 .await;
385 let read_options = Arc::new(SstableIteratorReadOptions::default());
386 let iters = vec![
387 SstableIterator::create(
388 table0,
389 sstable_store.clone(),
390 read_options.clone(),
391 &sstable_info_0,
392 ),
393 SstableIterator::create(
394 table1,
395 sstable_store.clone(),
396 read_options.clone(),
397 &sstable_info_1,
398 ),
399 SstableIterator::create(table2, sstable_store, read_options, &sstable_info_2),
400 ];
401
402 let mi = MergeIterator::new(iters);
403 let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
404
405 ui.seek(iterator_test_bytes_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
407 .await
408 .unwrap();
409 assert!(!ui.is_valid());
410
411 ui.seek(iterator_test_bytes_user_key_of(TEST_KEYS_COUNT + 5).as_ref())
413 .await
414 .unwrap();
415 let k = ui.key();
416 let v = ui.value();
417 assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 5).as_slice());
418 assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 5).to_ref());
419 ui.seek(iterator_test_bytes_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
420 .await
421 .unwrap();
422 let k = ui.key();
423 let v = ui.value();
424 assert_eq!(
425 v,
426 iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
427 );
428 assert_eq!(
429 k,
430 iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
431 );
432
433 ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
435 .await
436 .unwrap();
437 let k = ui.key();
438 let v = ui.value();
439 assert_eq!(v, iterator_test_value_of(0).as_slice());
440 assert_eq!(k, iterator_test_bytes_key_of(0).to_ref());
441 }
442
443 #[tokio::test]
444 async fn test_delete() {
445 let sstable_store = mock_sstable_store().await;
446
447 let kv_pairs = vec![
449 (1, 100, HummockValue::put(iterator_test_value_of(1))),
450 (2, 300, HummockValue::delete()),
451 ];
452 let (table0, sstable_info_0) =
453 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
454
455 let kv_pairs = vec![
456 (1, 200, HummockValue::delete()),
457 (2, 400, HummockValue::put(iterator_test_value_of(2))),
458 ];
459 let (table1, sstable_info_1) =
460 gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
461
462 let read_options = Arc::new(SstableIteratorReadOptions::default());
463 let iters = vec![
464 SstableIterator::create(
465 table0,
466 sstable_store.clone(),
467 read_options.clone(),
468 &sstable_info_0,
469 ),
470 SstableIterator::create(table1, sstable_store.clone(), read_options, &sstable_info_1),
471 ];
472
473 let mi = MergeIterator::new(iters);
474 let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
475 ui.rewind().await.unwrap();
476
477 let k = ui.key();
479 let v = ui.value();
480 assert_eq!(k, iterator_test_bytes_key_of_epoch(2, 400).to_ref());
481 assert_eq!(v, &Bytes::from(iterator_test_value_of(2)));
482
483 ui.next().await.unwrap();
485 assert!(!ui.is_valid());
486 }
487
488 async fn generate_test_data(sstable_store: SstableStoreRef) -> (TableHolder, SstableInfo) {
489 let kv_pairs = vec![
490 (0, 200, HummockValue::delete()),
491 (0, 100, HummockValue::put(iterator_test_value_of(0))),
492 (1, 200, HummockValue::put(iterator_test_value_of(1))),
493 (1, 100, HummockValue::delete()),
494 (2, 300, HummockValue::put(iterator_test_value_of(2))),
495 (2, 200, HummockValue::delete()),
496 (2, 100, HummockValue::delete()),
497 (3, 100, HummockValue::put(iterator_test_value_of(3))),
498 (5, 200, HummockValue::delete()),
499 (5, 100, HummockValue::put(iterator_test_value_of(5))),
500 (6, 100, HummockValue::put(iterator_test_value_of(6))),
501 (7, 200, HummockValue::delete()),
502 (7, 100, HummockValue::put(iterator_test_value_of(7))),
503 (8, 100, HummockValue::put(iterator_test_value_of(8))),
504 ];
505 let sst_info =
506 gen_iterator_test_sstable_with_range_tombstones(0, kv_pairs, sstable_store.clone())
507 .await;
508 (
509 sstable_store
510 .sstable(&sst_info, &mut StoreLocalStatistic::default())
511 .await
512 .unwrap(),
513 sst_info,
514 )
515 }
516
517 #[tokio::test]
519 async fn test_range_inclusive() {
520 let sstable_store = mock_sstable_store().await;
521 let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
523 let read_options = Arc::new(SstableIteratorReadOptions::default());
524 let iters = vec![SstableIterator::create(
525 table,
526 sstable_store,
527 read_options,
528 &sstable_info,
529 )];
530 let mi = MergeIterator::new(iters);
531
532 let begin_key = Included(iterator_test_bytes_user_key_of(2));
533 let end_key = Included(iterator_test_bytes_user_key_of(7));
534
535 let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
536
537 ui.rewind().await.unwrap();
539 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
540 ui.next().await.unwrap();
541 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
542 ui.next().await.unwrap();
543 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
544 ui.next().await.unwrap();
545 assert!(!ui.is_valid());
546
547 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
549 .await
550 .unwrap();
551 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
552 ui.next().await.unwrap();
553 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
554 ui.next().await.unwrap();
555 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
556 ui.next().await.unwrap();
557 assert!(!ui.is_valid());
558
559 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
561 .await
562 .unwrap();
563 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
564 ui.next().await.unwrap();
565 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
566 ui.next().await.unwrap();
567 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
568 ui.next().await.unwrap();
569 assert!(!ui.is_valid());
570
571 ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
573 .await
574 .unwrap();
575 assert!(!ui.is_valid());
576
577 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
579 .await
580 .unwrap();
581 assert!(!ui.is_valid());
582 }
583
584 #[tokio::test]
586 async fn test_range() {
587 let sstable_store = mock_sstable_store().await;
588 let kv_pairs = vec![
590 (0, 200, HummockValue::delete()),
591 (0, 100, HummockValue::put(iterator_test_value_of(0))),
592 (1, 200, HummockValue::put(iterator_test_value_of(1))),
593 (1, 100, HummockValue::delete()),
594 (2, 300, HummockValue::put(iterator_test_value_of(2))),
595 (2, 200, HummockValue::delete()),
596 (2, 100, HummockValue::delete()),
597 (3, 100, HummockValue::put(iterator_test_value_of(3))),
598 (5, 200, HummockValue::delete()),
599 (5, 100, HummockValue::put(iterator_test_value_of(5))),
600 (6, 100, HummockValue::put(iterator_test_value_of(6))),
601 (7, 100, HummockValue::put(iterator_test_value_of(7))),
602 (8, 100, HummockValue::put(iterator_test_value_of(8))),
603 ];
604 let (table, sstable_info) =
605 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
606 let read_options = Arc::new(SstableIteratorReadOptions::default());
607 let iters = vec![SstableIterator::create(
608 table.clone(),
609 sstable_store.clone(),
610 read_options.clone(),
611 &sstable_info,
612 )];
613 let mi = MergeIterator::new(iters);
614
615 let begin_key = Included(iterator_test_bytes_user_key_of(2));
616 let end_key = Excluded(iterator_test_bytes_user_key_of(7));
617
618 let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
619
620 ui.rewind().await.unwrap();
622 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
623 ui.next().await.unwrap();
624 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
625 ui.next().await.unwrap();
626 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
627 ui.next().await.unwrap();
628 assert!(!ui.is_valid());
629
630 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
632 .await
633 .unwrap();
634 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
635 ui.next().await.unwrap();
636 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
637 ui.next().await.unwrap();
638 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
639 ui.next().await.unwrap();
640 assert!(!ui.is_valid());
641
642 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
644 .await
645 .unwrap();
646 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
647 ui.next().await.unwrap();
648 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
649 ui.next().await.unwrap();
650 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
651 ui.next().await.unwrap();
652 assert!(!ui.is_valid());
653
654 ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
656 .await
657 .unwrap();
658 assert!(!ui.is_valid());
659
660 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
662 .await
663 .unwrap();
664 assert!(!ui.is_valid());
665
666 let iters = vec![SstableIterator::create(
667 table,
668 sstable_store,
669 read_options,
670 &sstable_info,
671 )];
672 let mi = MergeIterator::new(iters);
673
674 let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
675 let end_key = Excluded(iterator_test_bytes_user_key_of(7));
676
677 let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
678 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
680 .await
681 .unwrap();
682 assert!(ui.is_valid());
683 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
684 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
685 .await
686 .unwrap();
687 assert!(ui.is_valid());
688 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
689 ui.seek(iterator_test_bytes_user_key_of(3).as_ref())
690 .await
691 .unwrap();
692 assert!(ui.is_valid());
693 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
694 ui.seek(iterator_test_bytes_user_key_of(4).as_ref())
695 .await
696 .unwrap();
697 assert!(ui.is_valid());
698 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
699 }
700
701 #[tokio::test]
703 async fn test_range_to_inclusive() {
704 let sstable_store = mock_sstable_store().await;
705 let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
708 let read_options = Arc::new(SstableIteratorReadOptions::default());
709 let iters = vec![SstableIterator::create(
710 table,
711 sstable_store,
712 read_options,
713 &sstable_info,
714 )];
715 let mi = MergeIterator::new(iters);
716 let end_key = Included(iterator_test_bytes_user_key_of(7));
717
718 let mut ui = UserIterator::for_test(mi, (Unbounded, end_key));
719
720 ui.rewind().await.unwrap();
722 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
723 ui.next().await.unwrap();
724 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
725 ui.next().await.unwrap();
726 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
727 ui.next().await.unwrap();
728 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
729 ui.next().await.unwrap();
730 assert!(!ui.is_valid());
731
732 ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
734 .await
735 .unwrap();
736 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
737 ui.next().await.unwrap();
738 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
739 ui.next().await.unwrap();
740 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
741 ui.next().await.unwrap();
742 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
743 ui.next().await.unwrap();
744 assert!(!ui.is_valid());
745
746 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
748 .await
749 .unwrap();
750 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
751 ui.next().await.unwrap();
752 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
753 ui.next().await.unwrap();
754 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
755 ui.next().await.unwrap();
756 assert!(!ui.is_valid());
757
758 ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
760 .await
761 .unwrap();
762 assert!(!ui.is_valid());
763
764 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
766 .await
767 .unwrap();
768 assert!(!ui.is_valid());
769 }
770
771 #[tokio::test]
773 async fn test_range_from() {
774 let sstable_store = mock_sstable_store().await;
775 let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
777 let read_options = Arc::new(SstableIteratorReadOptions::default());
778 let iters = vec![SstableIterator::create(
779 table,
780 sstable_store,
781 read_options,
782 &sstable_info,
783 )];
784 let mi = MergeIterator::new(iters);
785 let begin_key = Included(iterator_test_bytes_user_key_of(2));
786
787 let mut ui = UserIterator::for_test(mi, (begin_key, Unbounded));
788
789 ui.rewind().await.unwrap();
791 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
792 ui.next().await.unwrap();
793 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
794 ui.next().await.unwrap();
795 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
796 ui.next().await.unwrap();
797 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
798 ui.next().await.unwrap();
799 assert!(!ui.is_valid());
800
801 ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
803 .await
804 .unwrap();
805 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
806 ui.next().await.unwrap();
807 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
808 ui.next().await.unwrap();
809 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
810 ui.next().await.unwrap();
811 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
812 ui.next().await.unwrap();
813 assert!(!ui.is_valid());
814
815 ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
817 .await
818 .unwrap();
819 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
820 ui.next().await.unwrap();
821 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
822 ui.next().await.unwrap();
823 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
824 ui.next().await.unwrap();
825 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
826 ui.next().await.unwrap();
827 assert!(!ui.is_valid());
828
829 ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
831 .await
832 .unwrap();
833 assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
834 ui.next().await.unwrap();
835 assert!(!ui.is_valid());
836
837 ui.seek(iterator_test_bytes_user_key_of(9).as_ref())
839 .await
840 .unwrap();
841 assert!(!ui.is_valid());
842 }
843
844 #[tokio::test]
845 async fn test_min_epoch() {
846 let sstable_store = mock_sstable_store().await;
847 let read_options = Arc::new(SstableIteratorReadOptions::default());
848 let (table0, sstable_info_0) = gen_iterator_test_sstable_with_incr_epoch(
849 0,
850 default_builder_opt_for_test(),
851 |x| x * 3,
852 sstable_store.clone(),
853 TEST_KEYS_COUNT,
854 1,
855 )
856 .await;
857 let iters = vec![SstableIterator::create(
858 table0,
859 sstable_store.clone(),
860 read_options.clone(),
861 &sstable_info_0,
862 )];
863
864 let min_count = (TEST_KEYS_COUNT / 5) as u64;
865 let min_epoch = test_epoch(min_count);
866 let mi = MergeIterator::new(iters);
867 let mut ui =
868 UserIterator::for_test_with_epoch(mi, (Unbounded, Unbounded), u64::MAX, min_epoch);
869 ui.rewind().await.unwrap();
870
871 let mut i = 0;
872 while ui.is_valid() {
873 let key = ui.key();
874 let key_epoch = key.epoch_with_gap.pure_epoch();
875 assert!(key_epoch >= min_epoch);
876
877 i += 1;
878 ui.next().await.unwrap();
879 }
880
881 let expect_count = TEST_KEYS_COUNT - (min_epoch / test_epoch(1)) as usize + 1;
882 assert_eq!(i, expect_count);
883 }
884}