1use std::ops::Bound::*;
16
17use bytes::Bytes;
18use more_asserts::debug_assert_le;
19use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange};
20use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
21
22use crate::hummock::HummockResult;
23use crate::hummock::iterator::{Backward, HummockIterator};
24use crate::hummock::local_version::pinned_version::PinnedVersion;
25use crate::hummock::value::HummockValue;
26use crate::monitor::StoreLocalStatistic;
27
28pub struct BackwardUserIterator<I: HummockIterator<Direction = Backward>> {
30 iterator: I,
32
33 just_met_new_key: bool,
35
36 last_key: FullKey<Bytes>,
38
39 last_val: Bytes,
41
42 last_delete: bool,
44
45 out_of_range: bool,
47
48 key_range: UserKeyRange,
50
51 read_epoch: HummockEpoch,
53
54 min_epoch: HummockEpoch,
56
57 _version: Option<PinnedVersion>,
59
60 stats: StoreLocalStatistic,
62}
63
64impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
65 pub fn new(
67 iterator: I,
68 key_range: UserKeyRange,
69 read_epoch: u64,
70 min_epoch: u64,
71 version: Option<PinnedVersion>,
72 ) -> Self {
73 Self {
74 iterator,
75 out_of_range: false,
76 key_range,
77 just_met_new_key: false,
78 last_key: FullKey::default(),
79 last_val: Bytes::new(),
80 last_delete: true,
81 read_epoch,
82 min_epoch,
83 stats: StoreLocalStatistic::default(),
84 _version: version,
85 }
86 }
87
88 fn out_of_range(&self, key: UserKey<&[u8]>) -> bool {
89 match &self.key_range.0 {
90 Included(begin_key) => key < begin_key.as_ref(),
91 Excluded(begin_key) => key <= begin_key.as_ref(),
92 Unbounded => false,
93 }
94 }
95
96 fn reset(&mut self) {
97 self.last_key = FullKey::default();
98 self.just_met_new_key = false;
99 self.last_delete = true;
100 self.out_of_range = false;
101 }
102
103 pub async fn next(&mut self) -> HummockResult<()> {
110 if !self.iterator.is_valid() {
131 self.last_delete = true;
134 return Ok(());
135 }
136
137 while self.iterator.is_valid() {
138 let full_key = self.iterator.key();
139 let epoch = full_key.epoch_with_gap.pure_epoch();
140 let key = &full_key.user_key;
141
142 if epoch > self.min_epoch && epoch <= self.read_epoch {
143 if self.just_met_new_key {
144 self.last_key = full_key.copy_into();
145 self.just_met_new_key = false;
146 if self.out_of_range(self.last_key.user_key.as_ref()) {
148 self.out_of_range = true;
149 break;
150 }
151 } else if self.last_key.user_key.as_ref() != *key {
152 if !self.last_delete {
153 self.just_met_new_key = true;
156 self.stats.processed_key_count += 1;
157 return Ok(());
158 } else {
159 self.last_key = full_key.copy_into();
161 if self.out_of_range(self.last_key.user_key.as_ref()) {
163 self.out_of_range = true;
164 break;
165 }
166 }
167 } else {
168 self.stats.skip_multi_version_key_count += 1;
169 }
170 match self.iterator.value() {
176 HummockValue::Put(val) => {
177 self.last_key = full_key.copy_into();
179 self.last_val = Bytes::copy_from_slice(val);
180 self.last_delete = false;
181 }
182 HummockValue::Delete => {
183 self.last_delete = true;
184 }
185 }
186 }
187 self.iterator.next().await?;
188 }
189 Ok(()) }
191
192 pub fn key(&self) -> FullKey<&[u8]> {
200 assert!(self.is_valid());
201 self.last_key.to_ref()
202 }
203
204 pub fn value(&self) -> &[u8] {
208 assert!(self.is_valid());
209 &self.last_val
210 }
211
212 pub async fn rewind(&mut self) -> HummockResult<()> {
214 match &self.key_range.1 {
216 Included(end_key) | Excluded(end_key) => {
217 let full_key = FullKey {
218 user_key: end_key.as_ref(),
219 epoch_with_gap: EpochWithGap::new_min_epoch(),
220 };
221 self.iterator.seek(full_key).await?;
222 }
223 Unbounded => self.iterator.rewind().await?,
224 };
225
226 self.reset();
228 self.next().await?;
230 if let Excluded(end_key) = &self.key_range.1
231 && self.is_valid()
232 && self.key().user_key == end_key.as_ref()
233 {
234 self.next().await?;
235 }
236 Ok(())
237 }
238
239 pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
241 let seek_key = match &self.key_range.1 {
243 Included(end_key) | Excluded(end_key) => {
244 let end_key = end_key.as_ref();
245 if end_key < user_key {
246 end_key
247 } else {
248 user_key
249 }
250 }
251 Unbounded => user_key,
252 };
253 let full_key = FullKey {
254 user_key: seek_key,
255 epoch_with_gap: EpochWithGap::new_min_epoch(),
256 };
257 self.iterator.seek(full_key).await?;
258
259 self.reset();
261 self.next().await?;
263 if let Excluded(end_key) = &self.key_range.1
264 && self.is_valid()
265 && self.key().user_key == end_key.as_ref()
266 {
267 debug_assert_le!(end_key.as_ref(), user_key);
268 self.next().await?;
269 }
270 Ok(())
271 }
272
273 pub fn is_valid(&self) -> bool {
275 let has_enough_input = self.iterator.is_valid() || !self.last_delete;
280 has_enough_input && (!self.out_of_range)
281 }
282
283 pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
284 stats.add(&self.stats);
285 self.iterator.collect_local_statistic(stats);
286 }
287}
288
289#[cfg(test)]
290impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
291 pub(crate) fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
293 Self::new(iterator, key_range, HummockEpoch::MAX, 0, None)
294 }
295
296 pub(crate) fn with_min_epoch(
298 iterator: I,
299 key_range: UserKeyRange,
300 min_epoch: HummockEpoch,
301 ) -> Self {
302 Self::new(iterator, key_range, HummockEpoch::MAX, min_epoch, None)
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use std::cmp::Reverse;
309 use std::collections::BTreeMap;
310 use std::ops::Bound::{self, *};
311
312 use rand::distr::Alphanumeric;
313 use rand::{Rng, rng as thread_rng};
314 use risingwave_common::catalog::TableId;
315 use risingwave_common::util::epoch::{EpochExt, test_epoch};
316 use risingwave_hummock_sdk::key::prev_key;
317 use risingwave_hummock_sdk::sstable_info::SstableInfo;
318
319 use super::*;
320 use crate::hummock::iterator::MergeIterator;
321 use crate::hummock::iterator::test_utils::{
322 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_base,
323 gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
324 iterator_test_bytes_key_of, iterator_test_bytes_key_of_epoch,
325 iterator_test_bytes_user_key_of, iterator_test_user_key_of, iterator_test_value_of,
326 mock_sstable_store,
327 };
328 use crate::hummock::test_utils::gen_test_sstable;
329 use crate::hummock::{BackwardSstableIterator, SstableStoreRef, TableHolder};
330
331 #[tokio::test]
332 async fn test_backward_user_basic() {
333 let sstable_store = mock_sstable_store().await;
334 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
335 0,
336 default_builder_opt_for_test(),
337 |x| x * 3 + 1,
338 sstable_store.clone(),
339 TEST_KEYS_COUNT,
340 )
341 .await;
342 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
343 1,
344 default_builder_opt_for_test(),
345 |x| x * 3 + 2,
346 sstable_store.clone(),
347 TEST_KEYS_COUNT,
348 )
349 .await;
350 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
351 2,
352 default_builder_opt_for_test(),
353 |x| x * 3 + 3,
354 sstable_store.clone(),
355 TEST_KEYS_COUNT,
356 )
357 .await;
358
359 let backward_iters = vec![
360 BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
361 BackwardSstableIterator::new(table1, sstable_store.clone(), &sstable_info_1),
362 BackwardSstableIterator::new(table2, sstable_store, &sstable_info_2),
363 ];
364
365 let mi = MergeIterator::new(backward_iters);
366 let mut ui = BackwardUserIterator::for_test(mi, (Unbounded, Unbounded));
367 let mut i = 3 * TEST_KEYS_COUNT;
368 ui.rewind().await.unwrap();
369 while ui.is_valid() {
370 let key = ui.key();
371 let val = ui.value();
372 assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
373 assert_eq!(val, iterator_test_value_of(i).as_slice());
374 i -= 1;
375 ui.next().await.unwrap();
376 if i == 0 {
377 assert!(!ui.is_valid());
378 break;
379 }
380 }
381 }
382
383 #[tokio::test]
384 async fn test_backward_user_seek() {
385 let sstable_store = mock_sstable_store().await;
386 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
387 0,
388 default_builder_opt_for_test(),
389 |x| x * 3 + 1,
390 sstable_store.clone(),
391 TEST_KEYS_COUNT,
392 )
393 .await;
394 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
395 1,
396 default_builder_opt_for_test(),
397 |x| x * 3 + 2,
398 sstable_store.clone(),
399 TEST_KEYS_COUNT,
400 )
401 .await;
402 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
403 2,
404 default_builder_opt_for_test(),
405 |x| x * 3 + 3,
406 sstable_store.clone(),
407 TEST_KEYS_COUNT,
408 )
409 .await;
410 let backward_iters = vec![
411 BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
412 BackwardSstableIterator::new(table1, sstable_store.clone(), &sstable_info_1),
413 BackwardSstableIterator::new(table2, sstable_store, &sstable_info_2),
414 ];
415
416 let bmi = MergeIterator::new(backward_iters);
417 let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
418
419 bui.seek(iterator_test_user_key_of(0).as_ref())
421 .await
422 .unwrap();
423 assert!(!bui.is_valid());
424
425 bui.seek(iterator_test_user_key_of(TEST_KEYS_COUNT + 4).as_ref())
427 .await
428 .unwrap();
429 let k = bui.key();
430 let v = bui.value();
431 assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 4).as_slice());
432 assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 4).to_ref());
433 bui.seek(iterator_test_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
434 .await
435 .unwrap();
436 let k = bui.key();
437 let v = bui.value();
438 assert_eq!(
439 v,
440 iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
441 );
442 assert_eq!(
443 k,
444 iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
445 );
446
447 bui.seek(iterator_test_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
449 .await
450 .unwrap();
451 let k = bui.key();
452 let v = bui.value();
453 assert_eq!(v, iterator_test_value_of(3 * TEST_KEYS_COUNT).as_slice());
454 assert_eq!(k, iterator_test_bytes_key_of(3 * TEST_KEYS_COUNT).to_ref());
455 }
456
457 #[tokio::test]
458 async fn test_backward_user_delete() {
459 let sstable_store = mock_sstable_store().await;
460 let kv_pairs = vec![
462 (1, 300, HummockValue::delete()),
463 (2, 100, HummockValue::put(iterator_test_value_of(2))),
464 ];
465 let (table0, sstable_info_0) =
466 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
467
468 let kv_pairs = vec![
469 (1, 400, HummockValue::put(iterator_test_value_of(1))),
470 (2, 200, HummockValue::delete()),
471 ];
472 let (table1, sstable_info_1) =
473 gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
474 let backward_iters = vec![
475 BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
476 BackwardSstableIterator::new(table1, sstable_store, &sstable_info_1),
477 ];
478 let bmi = MergeIterator::new(backward_iters);
479 let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
480
481 bui.rewind().await.unwrap();
482
483 let k = bui.key();
485 let v = bui.value();
486
487 assert_eq!(k, iterator_test_bytes_key_of_epoch(1, 400).to_ref());
488 assert_eq!(v, iterator_test_value_of(1).as_slice());
489
490 bui.next().await.unwrap();
492 assert!(!bui.is_valid());
493 }
494
495 #[tokio::test]
497 async fn test_backward_user_range_inclusive() {
498 let sstable_store = mock_sstable_store().await;
499 let kv_pairs = vec![
501 (0, 200, HummockValue::delete()),
502 (0, 100, HummockValue::put(iterator_test_value_of(0))),
503 (1, 200, HummockValue::put(iterator_test_value_of(1))),
504 (1, 100, HummockValue::delete()),
505 (2, 400, HummockValue::delete()),
506 (2, 300, HummockValue::put(iterator_test_value_of(2))),
507 (2, 200, HummockValue::delete()),
508 (2, 100, HummockValue::put(iterator_test_value_of(2))),
509 (3, 100, HummockValue::put(iterator_test_value_of(3))),
510 (5, 200, HummockValue::delete()),
511 (5, 100, HummockValue::put(iterator_test_value_of(5))),
512 (6, 100, HummockValue::put(iterator_test_value_of(6))),
513 (7, 300, HummockValue::put(iterator_test_value_of(7))),
514 (7, 200, HummockValue::delete()),
515 (7, 100, HummockValue::put(iterator_test_value_of(7))),
516 (8, 100, HummockValue::put(iterator_test_value_of(8))),
517 ];
518 let (sstable, sstable_info) =
519 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
520 let backward_iters = vec![BackwardSstableIterator::new(
521 sstable,
522 sstable_store,
523 &sstable_info,
524 )];
525 let bmi = MergeIterator::new(backward_iters);
526
527 let begin_key = Included(iterator_test_bytes_user_key_of(2));
528 let end_key = Included(iterator_test_bytes_user_key_of(7));
529
530 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
531
532 bui.rewind().await.unwrap();
534 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
535 bui.next().await.unwrap();
536 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
537 bui.next().await.unwrap();
538 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
539 bui.next().await.unwrap();
540 assert!(!bui.is_valid());
541
542 bui.seek(iterator_test_user_key_of(8).as_ref())
544 .await
545 .unwrap();
546 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
547 bui.next().await.unwrap();
548 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
549 bui.next().await.unwrap();
550 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
551 bui.next().await.unwrap();
552 assert!(!bui.is_valid());
553
554 bui.seek(iterator_test_user_key_of(7).as_ref())
556 .await
557 .unwrap();
558 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
559 bui.next().await.unwrap();
560 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
561 bui.next().await.unwrap();
562 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
563 bui.next().await.unwrap();
564 assert!(!bui.is_valid());
565
566 bui.seek(iterator_test_user_key_of(2).as_ref())
568 .await
569 .unwrap();
570 assert!(!bui.is_valid());
571
572 bui.seek(iterator_test_user_key_of(1).as_ref())
574 .await
575 .unwrap();
576 assert!(!bui.is_valid());
577 }
578
579 #[tokio::test]
581 async fn test_backward_user_range() {
582 let sstable_store = mock_sstable_store().await;
583 let kv_pairs = vec![
585 (0, 200, HummockValue::delete()),
586 (0, 100, HummockValue::put(iterator_test_value_of(0))),
587 (1, 200, HummockValue::put(iterator_test_value_of(1))),
588 (1, 100, HummockValue::delete()),
589 (2, 300, HummockValue::put(iterator_test_value_of(2))),
590 (2, 200, HummockValue::delete()),
591 (2, 100, HummockValue::delete()),
592 (3, 100, HummockValue::put(iterator_test_value_of(3))),
593 (5, 200, HummockValue::delete()),
594 (5, 100, HummockValue::put(iterator_test_value_of(5))),
595 (6, 100, HummockValue::put(iterator_test_value_of(6))),
596 (7, 100, HummockValue::put(iterator_test_value_of(7))),
597 (8, 100, HummockValue::put(iterator_test_value_of(8))),
598 ];
599 let (sstable, sstable_info) =
600 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
601 let backward_iters = vec![BackwardSstableIterator::new(
602 sstable.clone(),
603 sstable_store.clone(),
604 &sstable_info,
605 )];
606 let bmi = MergeIterator::new(backward_iters);
607
608 let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
609 let end_key = Included(iterator_test_bytes_user_key_of(7));
610
611 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
612
613 bui.rewind().await.unwrap();
615 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
616 bui.next().await.unwrap();
617 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
618 bui.next().await.unwrap();
619 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
620 bui.next().await.unwrap();
621 assert!(!bui.is_valid());
622
623 bui.seek(iterator_test_user_key_of(8).as_ref())
625 .await
626 .unwrap();
627 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
628 bui.next().await.unwrap();
629 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
630 bui.next().await.unwrap();
631 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
632 bui.next().await.unwrap();
633 assert!(!bui.is_valid());
634
635 bui.seek(iterator_test_user_key_of(7).as_ref())
637 .await
638 .unwrap();
639 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
640 bui.next().await.unwrap();
641 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
642 bui.next().await.unwrap();
643 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
644 bui.next().await.unwrap();
645 assert!(!bui.is_valid());
646
647 bui.seek(iterator_test_user_key_of(2).as_ref())
649 .await
650 .unwrap();
651 assert!(!bui.is_valid());
652
653 bui.seek(iterator_test_user_key_of(1).as_ref())
655 .await
656 .unwrap();
657 assert!(!bui.is_valid());
658
659 let backward_iters = vec![BackwardSstableIterator::new(
660 sstable,
661 sstable_store,
662 &sstable_info,
663 )];
664 let bmi = MergeIterator::new(backward_iters);
665
666 let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
667 let end_key = Excluded(iterator_test_bytes_user_key_of(7));
668
669 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
670 bui.rewind().await.unwrap();
671 assert!(bui.is_valid());
672 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
673 bui.seek(iterator_test_user_key_of(7).as_ref())
675 .await
676 .unwrap();
677 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
678 bui.seek(iterator_test_user_key_of(5).as_ref())
679 .await
680 .unwrap();
681 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
682 }
683
684 #[tokio::test]
686 async fn test_backward_user_range_to_inclusive() {
687 let sstable_store = mock_sstable_store().await;
688 let kv_pairs = vec![
690 (0, 200, HummockValue::delete()),
691 (0, 100, HummockValue::put(iterator_test_value_of(0))),
692 (1, 200, HummockValue::put(iterator_test_value_of(1))),
693 (1, 100, HummockValue::delete()),
694 (2, 300, HummockValue::put(iterator_test_value_of(2))),
695 (2, 200, HummockValue::delete()),
696 (2, 100, HummockValue::delete()),
697 (3, 100, HummockValue::put(iterator_test_value_of(3))),
698 (5, 200, HummockValue::delete()),
699 (5, 100, HummockValue::put(iterator_test_value_of(5))),
700 (6, 100, HummockValue::put(iterator_test_value_of(6))),
701 (7, 200, HummockValue::delete()),
702 (7, 100, HummockValue::put(iterator_test_value_of(7))),
703 (8, 100, HummockValue::put(iterator_test_value_of(8))),
704 ];
705 let (sstable, sstable_info) =
706 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
707 let backward_iters = vec![BackwardSstableIterator::new(
708 sstable.clone(),
709 sstable_store.clone(),
710 &sstable_info,
711 )];
712 let bmi = MergeIterator::new(backward_iters);
713 let end_key = Included(iterator_test_bytes_user_key_of(7));
714
715 let mut bui = BackwardUserIterator::for_test(
716 bmi,
717 (Included(iterator_test_bytes_user_key_of(2)), end_key),
718 );
719
720 bui.rewind().await.unwrap();
722 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
723 bui.next().await.unwrap();
724 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
725 bui.next().await.unwrap();
726 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
727 bui.next().await.unwrap();
728 assert!(!bui.is_valid());
729
730 bui.seek(iterator_test_user_key_of(7).as_ref())
732 .await
733 .unwrap();
734 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
735 bui.next().await.unwrap();
736 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
737 bui.next().await.unwrap();
738 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
739 bui.next().await.unwrap();
740 assert!(!bui.is_valid());
741
742 bui.seek(iterator_test_user_key_of(6).as_ref())
744 .await
745 .unwrap();
746 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
747 bui.next().await.unwrap();
748 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
749 bui.next().await.unwrap();
750 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
751 bui.next().await.unwrap();
752 assert!(!bui.is_valid());
753
754 bui.seek(iterator_test_user_key_of(0).as_ref())
756 .await
757 .unwrap();
758 assert!(!bui.is_valid());
759
760 let end_key = Excluded(iterator_test_bytes_user_key_of(6));
761 let backward_iters = vec![BackwardSstableIterator::new(
762 sstable,
763 sstable_store,
764 &sstable_info,
765 )];
766 let bmi = MergeIterator::new(backward_iters);
767 let mut bui = BackwardUserIterator::for_test(
768 bmi,
769 (Excluded(iterator_test_bytes_user_key_of(2)), end_key),
770 );
771 bui.seek(iterator_test_user_key_of(6).as_ref())
773 .await
774 .unwrap();
775 assert!(bui.is_valid());
776 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
777 bui.next().await.unwrap();
778 assert!(!bui.is_valid());
779 bui.seek(iterator_test_user_key_of(7).as_ref())
780 .await
781 .unwrap();
782 assert!(bui.is_valid());
783 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
784 }
785
786 #[tokio::test]
788 async fn test_backward_user_range_from() {
789 let sstable_store = mock_sstable_store().await;
790 let kv_pairs = vec![
792 (0, 200, HummockValue::delete()),
793 (0, 100, HummockValue::put(iterator_test_value_of(0))),
794 (1, 200, HummockValue::put(iterator_test_value_of(1))),
795 (1, 100, HummockValue::delete()),
796 (2, 300, HummockValue::put(iterator_test_value_of(2))),
797 (2, 200, HummockValue::delete()),
798 (2, 100, HummockValue::delete()),
799 (3, 100, HummockValue::put(iterator_test_value_of(3))),
800 (5, 200, HummockValue::delete()),
801 (5, 100, HummockValue::put(iterator_test_value_of(5))),
802 (6, 100, HummockValue::put(iterator_test_value_of(6))),
803 (7, 200, HummockValue::delete()),
804 (7, 100, HummockValue::put(iterator_test_value_of(7))),
805 (8, 100, HummockValue::put(iterator_test_value_of(8))),
806 ];
807 let (handle, sstable_info) =
808 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
809 let backward_iters = vec![BackwardSstableIterator::new(
810 handle,
811 sstable_store,
812 &sstable_info,
813 )];
814 let bmi = MergeIterator::new(backward_iters);
815 let begin_key = Included(iterator_test_bytes_user_key_of(2));
816
817 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, Unbounded));
818
819 bui.rewind().await.unwrap();
821 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
822 bui.next().await.unwrap();
823 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
824 bui.next().await.unwrap();
825 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
826 bui.next().await.unwrap();
827 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
828 bui.next().await.unwrap();
829 assert!(!bui.is_valid());
830
831 bui.seek(iterator_test_user_key_of(2).as_ref())
833 .await
834 .unwrap();
835 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
836 bui.next().await.unwrap();
837 assert!(!bui.is_valid());
838
839 bui.seek(iterator_test_user_key_of(5).as_ref())
841 .await
842 .unwrap();
843 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
844 bui.next().await.unwrap();
845 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
846 bui.next().await.unwrap();
847 assert!(!bui.is_valid());
848
849 bui.seek(iterator_test_user_key_of(8).as_ref())
851 .await
852 .unwrap();
853 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
854 bui.next().await.unwrap();
855 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
856 bui.next().await.unwrap();
857 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
858 bui.next().await.unwrap();
859 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
860 bui.next().await.unwrap();
861 assert!(!bui.is_valid());
862
863 bui.seek(iterator_test_user_key_of(9).as_ref())
865 .await
866 .unwrap();
867 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
868 bui.next().await.unwrap();
869 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
870 bui.next().await.unwrap();
871 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
872 bui.next().await.unwrap();
873 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
874 bui.next().await.unwrap();
875 assert!(!bui.is_valid());
876 }
877
878 fn key_from_num(num: usize) -> UserKey<Vec<u8>> {
879 let width = 20;
880 UserKey::for_test(
881 TableId::default(),
882 format!("{:0width$}", num, width = width)
883 .as_bytes()
884 .to_vec(),
885 )
886 }
887
888 async fn chaos_test_case(
889 handle: TableHolder,
890 start_bound: Bound<UserKey<Bytes>>,
891 end_bound: Bound<UserKey<Bytes>>,
892 truth: &ChaosTestTruth,
893 sstable_store: SstableStoreRef,
894 sstable_info: &SstableInfo,
895 ) {
896 let start_key = match &start_bound {
897 Bound::Included(b) => {
898 UserKey::for_test(b.table_id, Bytes::from(prev_key(&b.table_key.clone())))
899 }
900 Bound::Excluded(b) => b.clone(),
901 Unbounded => key_from_num(0).into_bytes(),
902 };
903 let end_key = match &end_bound {
904 Bound::Included(b) => b.clone(),
905 Unbounded => key_from_num(999999999999).into_bytes(),
906 _ => unimplemented!(),
907 };
908
909 let backward_iters = vec![BackwardSstableIterator::new(
910 handle,
911 sstable_store,
912 sstable_info,
913 )];
914 let bmi = MergeIterator::new(backward_iters);
915 let mut bui = BackwardUserIterator::for_test(bmi, (start_bound, end_bound));
916 let num_puts: usize = truth
917 .iter()
918 .map(|(key, inserts)| {
919 if *key > end_key || *key <= start_key {
920 return 0;
921 }
922 match inserts.first_key_value().unwrap().1 {
923 HummockValue::Put(_) => 1,
924 HummockValue::Delete => 0,
925 }
926 })
927 .reduce(|accum, item| accum + item)
928 .unwrap();
929 let mut num_kvs = 0;
930 bui.rewind().await.unwrap();
931 for (key, value) in truth.iter().rev() {
932 if *key > end_key || *key <= start_key {
933 continue;
934 }
935 let (_, value) = value.first_key_value().unwrap();
936 if let HummockValue::Delete = value {
937 continue;
938 }
939 assert!(bui.is_valid(), "num_kvs:{}", num_kvs);
940 assert_eq!(bui.key().user_key, key.as_ref(), "num_kvs:{}", num_kvs);
941 if let HummockValue::Put(bytes) = &value {
942 assert_eq!(bui.value(), bytes, "num_kvs:{}", num_kvs);
943 }
944 bui.next().await.unwrap();
945 num_kvs += 1;
946 }
947 assert!(!bui.is_valid());
948 assert_eq!(num_kvs, num_puts);
949 }
950
951 type ChaosTestTruth =
952 BTreeMap<UserKey<Bytes>, BTreeMap<Reverse<HummockEpoch>, HummockValue<Bytes>>>;
953
954 async fn generate_chaos_test_data() -> (
955 usize,
956 TableHolder,
957 ChaosTestTruth,
958 SstableStoreRef,
959 SstableInfo,
960 ) {
961 let mut rng = thread_rng();
963
964 let mut truth: ChaosTestTruth = BTreeMap::new();
965 let mut prev_key_number: usize = 1;
966 let number_of_keys = 5000;
967 for _ in 0..number_of_keys {
968 let key: usize = rng.random_range(prev_key_number..=(prev_key_number + 10));
969 prev_key_number = key + 1;
970 let key_bytes = key_from_num(key).into_bytes();
971 let mut prev_time = 500;
972 let num_updates = rng.random_range(1..10usize);
973 for _ in 0..num_updates {
974 let time: HummockEpoch =
975 test_epoch(rng.random_range(prev_time..=(prev_time + 1000)));
976 let is_delete = rng.random_range(0..=1usize) < 1usize;
977 match is_delete {
978 true => {
979 truth
980 .entry(key_bytes.clone())
981 .or_default()
982 .insert(Reverse(time), HummockValue::delete());
983 }
984 false => {
985 let value_size = rng.random_range(100..=200);
986 let value: String = thread_rng()
987 .sample_iter(&Alphanumeric)
988 .take(value_size)
989 .map(char::from)
990 .collect();
991 truth
992 .entry(key_bytes.clone())
993 .or_default()
994 .insert(Reverse(time), HummockValue::put(Bytes::from(value)));
995 }
996 }
997 prev_time = time.next_epoch();
998 }
999 }
1000 let sstable_store = mock_sstable_store().await;
1001 let (sst, sstable_info) = gen_test_sstable(
1002 default_builder_opt_for_test(),
1003 0,
1004 truth.iter().flat_map(|(key, inserts)| {
1005 inserts.iter().map(|(time, value)| {
1006 let full_key = FullKey {
1007 user_key: key.clone(),
1008 epoch_with_gap: EpochWithGap::new_from_epoch(time.0),
1009 };
1010 (full_key, value.clone())
1011 })
1012 }),
1013 sstable_store.clone(),
1014 )
1015 .await;
1016
1017 (prev_key_number, sst, truth, sstable_store, sstable_info)
1018 }
1019
1020 #[tokio::test]
1021 async fn test_backward_user_chaos_unbounded_unbounded() {
1022 let (_prev_key_number, sst, truth, sstable_store, sstable_info) =
1023 generate_chaos_test_data().await;
1024 let repeat = 20;
1025 for _ in 0..repeat {
1026 chaos_test_case(
1027 sst.clone(),
1028 Unbounded,
1029 Unbounded,
1030 &truth,
1031 sstable_store.clone(),
1032 &sstable_info,
1033 )
1034 .await;
1035 }
1036 }
1037
1038 #[tokio::test]
1039 async fn test_backward_user_chaos_unbounded_included() {
1040 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1041 generate_chaos_test_data().await;
1042 let repeat = 20;
1043 for _ in 0..repeat {
1044 let mut rng = thread_rng();
1045 let end_key: usize = rng.random_range(2..=prev_key_number);
1046 let end_key_bytes = key_from_num(end_key).into_bytes();
1047 chaos_test_case(
1048 sst.clone(),
1049 Unbounded,
1050 Included(end_key_bytes.clone()),
1051 &truth,
1052 sstable_store.clone(),
1053 &sstable_info,
1054 )
1055 .await;
1056 }
1057 }
1058
1059 #[tokio::test]
1060 async fn test_backward_user_chaos_included_unbounded() {
1061 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1062 generate_chaos_test_data().await;
1063 let repeat = 20;
1064 for _ in 0..repeat {
1065 let mut rng = thread_rng();
1066 let end_key: usize = rng.random_range(2..=prev_key_number);
1067 let begin_key: usize = rng.random_range(1..=end_key);
1068 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1069 chaos_test_case(
1070 sst.clone(),
1071 Included(begin_key_bytes.clone()),
1072 Unbounded,
1073 &truth,
1074 sstable_store.clone(),
1075 &sstable_info,
1076 )
1077 .await;
1078 }
1079 }
1080
1081 #[tokio::test]
1082 async fn test_backward_user_chaos_excluded_unbounded() {
1083 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1084 generate_chaos_test_data().await;
1085 let repeat = 20;
1086 for _ in 0..repeat {
1087 let mut rng = thread_rng();
1088 let end_key: usize = rng.random_range(2..=prev_key_number);
1089 let begin_key: usize = rng.random_range(1..=end_key);
1090 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1091 chaos_test_case(
1092 sst.clone(),
1093 Excluded(begin_key_bytes.clone()),
1094 Unbounded,
1095 &truth,
1096 sstable_store.clone(),
1097 &sstable_info,
1098 )
1099 .await;
1100 }
1101 }
1102
1103 #[tokio::test]
1104 async fn test_backward_user_chaos_included_included() {
1105 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1106 generate_chaos_test_data().await;
1107 let repeat = 20;
1108 for _ in 0..repeat {
1109 let mut rng = thread_rng();
1110 let end_key: usize = rng.random_range(2..=prev_key_number);
1111 let end_key_bytes = key_from_num(end_key).into_bytes();
1112 let begin_key: usize = rng.random_range(1..=end_key);
1113 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1114 chaos_test_case(
1115 sst.clone(),
1116 Included(begin_key_bytes.clone()),
1117 Included(end_key_bytes.clone()),
1118 &truth,
1119 sstable_store.clone(),
1120 &sstable_info,
1121 )
1122 .await;
1123 }
1124 }
1125
1126 #[tokio::test]
1127 async fn test_backward_user_chaos_excluded_included() {
1128 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1129 generate_chaos_test_data().await;
1130 let repeat = 20;
1131 for _ in 0..repeat {
1132 let mut rng = thread_rng();
1133 let end_key: usize = rng.random_range(2..=prev_key_number);
1134 let end_key_bytes = key_from_num(end_key).into_bytes();
1135 let begin_key: usize = rng.random_range(1..=end_key);
1136 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1137 chaos_test_case(
1138 sst.clone(),
1139 Excluded(begin_key_bytes),
1140 Included(end_key_bytes),
1141 &truth,
1142 sstable_store.clone(),
1143 &sstable_info,
1144 )
1145 .await;
1146 }
1147 }
1148
1149 #[tokio::test]
1150 async fn test_min_epoch() {
1151 let sstable_store = mock_sstable_store().await;
1152 let (table0, sstable_info_0) = gen_iterator_test_sstable_with_incr_epoch(
1153 0,
1154 default_builder_opt_for_test(),
1155 |x| x * 3,
1156 sstable_store.clone(),
1157 TEST_KEYS_COUNT,
1158 1,
1159 )
1160 .await;
1161
1162 let backward_iters = vec![BackwardSstableIterator::new(
1163 table0,
1164 sstable_store,
1165 &sstable_info_0,
1166 )];
1167
1168 let min_count = (TEST_KEYS_COUNT / 5) as u64;
1169 let min_epoch = test_epoch(min_count);
1170 let mi = MergeIterator::new(backward_iters);
1171 let mut ui = BackwardUserIterator::with_min_epoch(mi, (Unbounded, Unbounded), min_epoch);
1172 ui.rewind().await.unwrap();
1173
1174 let mut i = 0;
1175 while ui.is_valid() {
1176 let key = ui.key();
1177 let key_epoch = key.epoch_with_gap.pure_epoch();
1178 assert!(key_epoch > min_epoch);
1179
1180 i += 1;
1181 ui.next().await.unwrap();
1182 }
1183
1184 let expect_count = TEST_KEYS_COUNT - min_count as usize;
1185 assert_eq!(i, expect_count);
1186 }
1187}