1use std::ops::Bound::*;
16
17use bytes::Bytes;
18use more_asserts::debug_assert_le;
19use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange};
20use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
21
22use crate::hummock::HummockResult;
23use crate::hummock::iterator::{Backward, HummockIterator};
24use crate::hummock::local_version::pinned_version::PinnedVersion;
25use crate::hummock::value::HummockValue;
26use crate::monitor::StoreLocalStatistic;
27
28pub struct BackwardUserIterator<I: HummockIterator<Direction = Backward>> {
30 iterator: I,
32
33 just_met_new_key: bool,
35
36 last_key: FullKey<Bytes>,
38
39 last_val: Bytes,
41
42 last_delete: bool,
44
45 out_of_range: bool,
47
48 key_range: UserKeyRange,
50
51 read_epoch: HummockEpoch,
53
54 min_epoch: HummockEpoch,
56
57 _version: Option<PinnedVersion>,
59
60 stats: StoreLocalStatistic,
62}
63
64impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
65 pub fn new(
67 iterator: I,
68 key_range: UserKeyRange,
69 read_epoch: u64,
70 min_epoch: u64,
71 version: Option<PinnedVersion>,
72 ) -> Self {
73 Self {
74 iterator,
75 out_of_range: false,
76 key_range,
77 just_met_new_key: false,
78 last_key: FullKey::default(),
79 last_val: Bytes::new(),
80 last_delete: true,
81 read_epoch,
82 min_epoch,
83 stats: StoreLocalStatistic::default(),
84 _version: version,
85 }
86 }
87
88 fn out_of_range(&self, key: UserKey<&[u8]>) -> bool {
89 match &self.key_range.0 {
90 Included(begin_key) => key < begin_key.as_ref(),
91 Excluded(begin_key) => key <= begin_key.as_ref(),
92 Unbounded => false,
93 }
94 }
95
96 fn reset(&mut self) {
97 self.last_key = FullKey::default();
98 self.just_met_new_key = false;
99 self.last_delete = true;
100 self.out_of_range = false;
101 }
102
103 pub async fn next(&mut self) -> HummockResult<()> {
110 if !self.iterator.is_valid() {
131 self.last_delete = true;
134 return Ok(());
135 }
136
137 while self.iterator.is_valid() {
138 let full_key = self.iterator.key();
139 let epoch = full_key.epoch_with_gap.pure_epoch();
140 let key = &full_key.user_key;
141
142 if epoch > self.min_epoch && epoch <= self.read_epoch {
143 if self.just_met_new_key {
144 self.last_key = full_key.copy_into();
145 self.just_met_new_key = false;
146 if self.out_of_range(self.last_key.user_key.as_ref()) {
148 self.out_of_range = true;
149 break;
150 }
151 } else if self.last_key.user_key.as_ref() != *key {
152 if !self.last_delete {
153 self.just_met_new_key = true;
156 self.stats.processed_key_count += 1;
157 return Ok(());
158 } else {
159 self.last_key = full_key.copy_into();
161 if self.out_of_range(self.last_key.user_key.as_ref()) {
163 self.out_of_range = true;
164 break;
165 }
166 }
167 } else {
168 self.stats.skip_multi_version_key_count += 1;
169 }
170 match self.iterator.value() {
176 HummockValue::Put(val) => {
177 self.last_key = full_key.copy_into();
179 self.last_val = Bytes::copy_from_slice(val);
180 self.last_delete = false;
181 }
182 HummockValue::Delete => {
183 self.last_delete = true;
184 }
185 }
186 }
187 self.iterator.next().await?;
188 }
189 Ok(()) }
191
192 pub fn key(&self) -> FullKey<&[u8]> {
200 assert!(self.is_valid());
201 self.last_key.to_ref()
202 }
203
204 pub fn value(&self) -> &[u8] {
208 assert!(self.is_valid());
209 &self.last_val
210 }
211
212 pub async fn rewind(&mut self) -> HummockResult<()> {
214 match &self.key_range.1 {
216 Included(end_key) | Excluded(end_key) => {
217 let full_key = FullKey {
218 user_key: end_key.as_ref(),
219 epoch_with_gap: EpochWithGap::new_min_epoch(),
220 };
221 self.iterator.seek(full_key).await?;
222 }
223 Unbounded => self.iterator.rewind().await?,
224 };
225
226 self.reset();
228 self.next().await?;
230 if let Excluded(end_key) = &self.key_range.1
231 && self.is_valid()
232 && self.key().user_key == end_key.as_ref()
233 {
234 self.next().await?;
235 }
236 Ok(())
237 }
238
239 pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
241 let seek_key = match &self.key_range.1 {
243 Included(end_key) | Excluded(end_key) => {
244 let end_key = end_key.as_ref();
245 if end_key < user_key {
246 end_key
247 } else {
248 user_key
249 }
250 }
251 Unbounded => user_key,
252 };
253 let full_key = FullKey {
254 user_key: seek_key,
255 epoch_with_gap: EpochWithGap::new_min_epoch(),
256 };
257 self.iterator.seek(full_key).await?;
258
259 self.reset();
261 self.next().await?;
263 if let Excluded(end_key) = &self.key_range.1
264 && self.is_valid()
265 && self.key().user_key == end_key.as_ref()
266 {
267 debug_assert_le!(end_key.as_ref(), user_key);
268 self.next().await?;
269 }
270 Ok(())
271 }
272
273 pub fn is_valid(&self) -> bool {
275 let has_enough_input = self.iterator.is_valid() || !self.last_delete;
280 has_enough_input && (!self.out_of_range)
281 }
282
283 pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
284 stats.add(&self.stats);
285 self.iterator.collect_local_statistic(stats);
286 }
287}
288
289#[cfg(test)]
290impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
291 pub(crate) fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
293 Self::new(iterator, key_range, HummockEpoch::MAX, 0, None)
294 }
295
296 pub(crate) fn with_min_epoch(
298 iterator: I,
299 key_range: UserKeyRange,
300 min_epoch: HummockEpoch,
301 ) -> Self {
302 Self::new(iterator, key_range, HummockEpoch::MAX, min_epoch, None)
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use std::cmp::Reverse;
309 use std::collections::BTreeMap;
310 use std::ops::Bound::{self, *};
311
312 use rand::distr::Alphanumeric;
313 use rand::{Rng, rng as thread_rng};
314 use risingwave_common::catalog::TableId;
315 use risingwave_common::util::epoch::{EpochExt, test_epoch};
316 use risingwave_hummock_sdk::key::prev_key;
317 use risingwave_hummock_sdk::sstable_info::SstableInfo;
318
319 use super::*;
320 use crate::hummock::iterator::MergeIterator;
321 use crate::hummock::iterator::test_utils::{
322 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_base,
323 gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
324 iterator_test_bytes_key_of, iterator_test_bytes_key_of_epoch,
325 iterator_test_bytes_user_key_of, iterator_test_user_key_of, iterator_test_value_of,
326 mock_sstable_store,
327 };
328 use crate::hummock::test_utils::gen_test_sstable;
329 use crate::hummock::{BackwardSstableIterator, SstableStoreRef, TableHolder};
330
331 #[tokio::test]
332 async fn test_backward_user_basic() {
333 let sstable_store = mock_sstable_store().await;
334 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
335 0,
336 default_builder_opt_for_test(),
337 |x| x * 3 + 1,
338 sstable_store.clone(),
339 TEST_KEYS_COUNT,
340 )
341 .await;
342 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
343 1,
344 default_builder_opt_for_test(),
345 |x| x * 3 + 2,
346 sstable_store.clone(),
347 TEST_KEYS_COUNT,
348 )
349 .await;
350 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
351 2,
352 default_builder_opt_for_test(),
353 |x| x * 3 + 3,
354 sstable_store.clone(),
355 TEST_KEYS_COUNT,
356 )
357 .await;
358
359 let backward_iters = vec![
360 BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
361 BackwardSstableIterator::new(table1, sstable_store.clone(), &sstable_info_1),
362 BackwardSstableIterator::new(table2, sstable_store, &sstable_info_2),
363 ];
364
365 let mi = MergeIterator::new(backward_iters);
366 let mut ui = BackwardUserIterator::for_test(mi, (Unbounded, Unbounded));
367 let mut i = 3 * TEST_KEYS_COUNT;
368 ui.rewind().await.unwrap();
369 while ui.is_valid() {
370 let key = ui.key();
371 let val = ui.value();
372 assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
373 assert_eq!(val, iterator_test_value_of(i).as_slice());
374 i -= 1;
375 ui.next().await.unwrap();
376 if i == 0 {
377 assert!(!ui.is_valid());
378 break;
379 }
380 }
381 }
382
383 #[tokio::test]
384 async fn test_backward_user_seek() {
385 let sstable_store = mock_sstable_store().await;
386 let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
387 0,
388 default_builder_opt_for_test(),
389 |x| x * 3 + 1,
390 sstable_store.clone(),
391 TEST_KEYS_COUNT,
392 )
393 .await;
394 let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
395 1,
396 default_builder_opt_for_test(),
397 |x| x * 3 + 2,
398 sstable_store.clone(),
399 TEST_KEYS_COUNT,
400 )
401 .await;
402 let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
403 2,
404 default_builder_opt_for_test(),
405 |x| x * 3 + 3,
406 sstable_store.clone(),
407 TEST_KEYS_COUNT,
408 )
409 .await;
410 let backward_iters = vec![
411 BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
412 BackwardSstableIterator::new(table1, sstable_store.clone(), &sstable_info_1),
413 BackwardSstableIterator::new(table2, sstable_store, &sstable_info_2),
414 ];
415
416 let bmi = MergeIterator::new(backward_iters);
417 let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
418
419 bui.seek(iterator_test_user_key_of(0).as_ref())
421 .await
422 .unwrap();
423 assert!(!bui.is_valid());
424
425 bui.seek(iterator_test_user_key_of(TEST_KEYS_COUNT + 4).as_ref())
427 .await
428 .unwrap();
429 let k = bui.key();
430 let v = bui.value();
431 assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 4).as_slice());
432 assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 4).to_ref());
433 bui.seek(iterator_test_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
434 .await
435 .unwrap();
436 let k = bui.key();
437 let v = bui.value();
438 assert_eq!(
439 v,
440 iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
441 );
442 assert_eq!(
443 k,
444 iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
445 );
446
447 bui.seek(iterator_test_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
449 .await
450 .unwrap();
451 let k = bui.key();
452 let v = bui.value();
453 assert_eq!(v, iterator_test_value_of(3 * TEST_KEYS_COUNT).as_slice());
454 assert_eq!(k, iterator_test_bytes_key_of(3 * TEST_KEYS_COUNT).to_ref());
455 }
456
457 #[tokio::test]
458 async fn test_backward_user_delete() {
459 let sstable_store = mock_sstable_store().await;
460 let kv_pairs = vec![
462 (1, 300, HummockValue::delete()),
463 (2, 100, HummockValue::put(iterator_test_value_of(2))),
464 ];
465 let (table0, sstable_info_0) =
466 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
467
468 let kv_pairs = vec![
469 (1, 400, HummockValue::put(iterator_test_value_of(1))),
470 (2, 200, HummockValue::delete()),
471 ];
472 let (table1, sstable_info_1) =
473 gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
474 let backward_iters = vec![
475 BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
476 BackwardSstableIterator::new(table1, sstable_store, &sstable_info_1),
477 ];
478 let bmi = MergeIterator::new(backward_iters);
479 let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
480
481 bui.rewind().await.unwrap();
482
483 let k = bui.key();
485 let v = bui.value();
486
487 assert_eq!(k, iterator_test_bytes_key_of_epoch(1, 400).to_ref());
488 assert_eq!(v, iterator_test_value_of(1).as_slice());
489
490 bui.next().await.unwrap();
492 assert!(!bui.is_valid());
493 }
494
495 #[tokio::test]
497 async fn test_backward_user_range_inclusive() {
498 let sstable_store = mock_sstable_store().await;
499 let kv_pairs = vec![
501 (0, 200, HummockValue::delete()),
502 (0, 100, HummockValue::put(iterator_test_value_of(0))),
503 (1, 200, HummockValue::put(iterator_test_value_of(1))),
504 (1, 100, HummockValue::delete()),
505 (2, 400, HummockValue::delete()),
506 (2, 300, HummockValue::put(iterator_test_value_of(2))),
507 (2, 200, HummockValue::delete()),
508 (2, 100, HummockValue::put(iterator_test_value_of(2))),
509 (3, 100, HummockValue::put(iterator_test_value_of(3))),
510 (5, 200, HummockValue::delete()),
511 (5, 100, HummockValue::put(iterator_test_value_of(5))),
512 (6, 100, HummockValue::put(iterator_test_value_of(6))),
513 (7, 300, HummockValue::put(iterator_test_value_of(7))),
514 (7, 200, HummockValue::delete()),
515 (7, 100, HummockValue::put(iterator_test_value_of(7))),
516 (8, 100, HummockValue::put(iterator_test_value_of(8))),
517 ];
518 let (sstable, sstable_info) =
519 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
520 let backward_iters = vec![BackwardSstableIterator::new(
521 sstable,
522 sstable_store,
523 &sstable_info,
524 )];
525 let bmi = MergeIterator::new(backward_iters);
526
527 let begin_key = Included(iterator_test_bytes_user_key_of(2));
528 let end_key = Included(iterator_test_bytes_user_key_of(7));
529
530 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
531
532 bui.rewind().await.unwrap();
534 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
535 bui.next().await.unwrap();
536 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
537 bui.next().await.unwrap();
538 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
539 bui.next().await.unwrap();
540 assert!(!bui.is_valid());
541
542 bui.seek(iterator_test_user_key_of(8).as_ref())
544 .await
545 .unwrap();
546 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
547 bui.next().await.unwrap();
548 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
549 bui.next().await.unwrap();
550 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
551 bui.next().await.unwrap();
552 assert!(!bui.is_valid());
553
554 bui.seek(iterator_test_user_key_of(7).as_ref())
556 .await
557 .unwrap();
558 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
559 bui.next().await.unwrap();
560 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
561 bui.next().await.unwrap();
562 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
563 bui.next().await.unwrap();
564 assert!(!bui.is_valid());
565
566 bui.seek(iterator_test_user_key_of(2).as_ref())
568 .await
569 .unwrap();
570 assert!(!bui.is_valid());
571
572 bui.seek(iterator_test_user_key_of(1).as_ref())
574 .await
575 .unwrap();
576 assert!(!bui.is_valid());
577 }
578
579 #[tokio::test]
581 async fn test_backward_user_range() {
582 let sstable_store = mock_sstable_store().await;
583 let kv_pairs = vec![
585 (0, 200, HummockValue::delete()),
586 (0, 100, HummockValue::put(iterator_test_value_of(0))),
587 (1, 200, HummockValue::put(iterator_test_value_of(1))),
588 (1, 100, HummockValue::delete()),
589 (2, 300, HummockValue::put(iterator_test_value_of(2))),
590 (2, 200, HummockValue::delete()),
591 (2, 100, HummockValue::delete()),
592 (3, 100, HummockValue::put(iterator_test_value_of(3))),
593 (5, 200, HummockValue::delete()),
594 (5, 100, HummockValue::put(iterator_test_value_of(5))),
595 (6, 100, HummockValue::put(iterator_test_value_of(6))),
596 (7, 100, HummockValue::put(iterator_test_value_of(7))),
597 (8, 100, HummockValue::put(iterator_test_value_of(8))),
598 ];
599 let (sstable, sstable_info) =
600 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
601 let backward_iters = vec![BackwardSstableIterator::new(
602 sstable.clone(),
603 sstable_store.clone(),
604 &sstable_info,
605 )];
606 let bmi = MergeIterator::new(backward_iters);
607
608 let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
609 let end_key = Included(iterator_test_bytes_user_key_of(7));
610
611 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
612
613 bui.rewind().await.unwrap();
615 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
616 bui.next().await.unwrap();
617 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
618 bui.next().await.unwrap();
619 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
620 bui.next().await.unwrap();
621 assert!(!bui.is_valid());
622
623 bui.seek(iterator_test_user_key_of(8).as_ref())
625 .await
626 .unwrap();
627 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
628 bui.next().await.unwrap();
629 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
630 bui.next().await.unwrap();
631 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
632 bui.next().await.unwrap();
633 assert!(!bui.is_valid());
634
635 bui.seek(iterator_test_user_key_of(7).as_ref())
637 .await
638 .unwrap();
639 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
640 bui.next().await.unwrap();
641 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
642 bui.next().await.unwrap();
643 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
644 bui.next().await.unwrap();
645 assert!(!bui.is_valid());
646
647 bui.seek(iterator_test_user_key_of(2).as_ref())
649 .await
650 .unwrap();
651 assert!(!bui.is_valid());
652
653 bui.seek(iterator_test_user_key_of(1).as_ref())
655 .await
656 .unwrap();
657 assert!(!bui.is_valid());
658
659 let backward_iters = vec![BackwardSstableIterator::new(
660 sstable,
661 sstable_store,
662 &sstable_info,
663 )];
664 let bmi = MergeIterator::new(backward_iters);
665
666 let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
667 let end_key = Excluded(iterator_test_bytes_user_key_of(7));
668
669 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
670 bui.rewind().await.unwrap();
671 assert!(bui.is_valid());
672 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
673 bui.seek(iterator_test_user_key_of(7).as_ref())
675 .await
676 .unwrap();
677 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
678 bui.seek(iterator_test_user_key_of(5).as_ref())
679 .await
680 .unwrap();
681 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
682 }
683
684 #[tokio::test]
686 async fn test_backward_user_range_to_inclusive() {
687 let sstable_store = mock_sstable_store().await;
688 let kv_pairs = vec![
690 (0, 200, HummockValue::delete()),
691 (0, 100, HummockValue::put(iterator_test_value_of(0))),
692 (1, 200, HummockValue::put(iterator_test_value_of(1))),
693 (1, 100, HummockValue::delete()),
694 (2, 300, HummockValue::put(iterator_test_value_of(2))),
695 (2, 200, HummockValue::delete()),
696 (2, 100, HummockValue::delete()),
697 (3, 100, HummockValue::put(iterator_test_value_of(3))),
698 (5, 200, HummockValue::delete()),
699 (5, 100, HummockValue::put(iterator_test_value_of(5))),
700 (6, 100, HummockValue::put(iterator_test_value_of(6))),
701 (7, 200, HummockValue::delete()),
702 (7, 100, HummockValue::put(iterator_test_value_of(7))),
703 (8, 100, HummockValue::put(iterator_test_value_of(8))),
704 ];
705 let (sstable, sstable_info) =
706 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
707 let backward_iters = vec![BackwardSstableIterator::new(
708 sstable.clone(),
709 sstable_store.clone(),
710 &sstable_info,
711 )];
712 let bmi = MergeIterator::new(backward_iters);
713 let end_key = Included(iterator_test_bytes_user_key_of(7));
714
715 let mut bui = BackwardUserIterator::for_test(
716 bmi,
717 (Included(iterator_test_bytes_user_key_of(2)), end_key),
718 );
719
720 bui.rewind().await.unwrap();
722 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
723 bui.next().await.unwrap();
724 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
725 bui.next().await.unwrap();
726 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
727 bui.next().await.unwrap();
728 assert!(!bui.is_valid());
729
730 bui.seek(iterator_test_user_key_of(7).as_ref())
732 .await
733 .unwrap();
734 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
735 bui.next().await.unwrap();
736 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
737 bui.next().await.unwrap();
738 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
739 bui.next().await.unwrap();
740 assert!(!bui.is_valid());
741
742 bui.seek(iterator_test_user_key_of(6).as_ref())
744 .await
745 .unwrap();
746 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
747 bui.next().await.unwrap();
748 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
749 bui.next().await.unwrap();
750 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
751 bui.next().await.unwrap();
752 assert!(!bui.is_valid());
753
754 bui.seek(iterator_test_user_key_of(0).as_ref())
756 .await
757 .unwrap();
758 assert!(!bui.is_valid());
759
760 let end_key = Excluded(iterator_test_bytes_user_key_of(6));
761 let backward_iters = vec![BackwardSstableIterator::new(
762 sstable,
763 sstable_store,
764 &sstable_info,
765 )];
766 let bmi = MergeIterator::new(backward_iters);
767 let mut bui = BackwardUserIterator::for_test(
768 bmi,
769 (Excluded(iterator_test_bytes_user_key_of(2)), end_key),
770 );
771 bui.seek(iterator_test_user_key_of(6).as_ref())
773 .await
774 .unwrap();
775 assert!(bui.is_valid());
776 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
777 bui.next().await.unwrap();
778 assert!(!bui.is_valid());
779 bui.seek(iterator_test_user_key_of(7).as_ref())
780 .await
781 .unwrap();
782 assert!(bui.is_valid());
783 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
784 }
785
786 #[tokio::test]
788 async fn test_backward_user_range_from() {
789 let sstable_store = mock_sstable_store().await;
790 let kv_pairs = vec![
792 (0, 200, HummockValue::delete()),
793 (0, 100, HummockValue::put(iterator_test_value_of(0))),
794 (1, 200, HummockValue::put(iterator_test_value_of(1))),
795 (1, 100, HummockValue::delete()),
796 (2, 300, HummockValue::put(iterator_test_value_of(2))),
797 (2, 200, HummockValue::delete()),
798 (2, 100, HummockValue::delete()),
799 (3, 100, HummockValue::put(iterator_test_value_of(3))),
800 (5, 200, HummockValue::delete()),
801 (5, 100, HummockValue::put(iterator_test_value_of(5))),
802 (6, 100, HummockValue::put(iterator_test_value_of(6))),
803 (7, 200, HummockValue::delete()),
804 (7, 100, HummockValue::put(iterator_test_value_of(7))),
805 (8, 100, HummockValue::put(iterator_test_value_of(8))),
806 ];
807 let (handle, sstable_info) =
808 gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
809 let backward_iters = vec![BackwardSstableIterator::new(
810 handle,
811 sstable_store,
812 &sstable_info,
813 )];
814 let bmi = MergeIterator::new(backward_iters);
815 let begin_key = Included(iterator_test_bytes_user_key_of(2));
816
817 let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, Unbounded));
818
819 bui.rewind().await.unwrap();
821 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
822 bui.next().await.unwrap();
823 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
824 bui.next().await.unwrap();
825 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
826 bui.next().await.unwrap();
827 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
828 bui.next().await.unwrap();
829 assert!(!bui.is_valid());
830
831 bui.seek(iterator_test_user_key_of(2).as_ref())
833 .await
834 .unwrap();
835 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
836 bui.next().await.unwrap();
837 assert!(!bui.is_valid());
838
839 bui.seek(iterator_test_user_key_of(5).as_ref())
841 .await
842 .unwrap();
843 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
844 bui.next().await.unwrap();
845 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
846 bui.next().await.unwrap();
847 assert!(!bui.is_valid());
848
849 bui.seek(iterator_test_user_key_of(8).as_ref())
851 .await
852 .unwrap();
853 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
854 bui.next().await.unwrap();
855 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
856 bui.next().await.unwrap();
857 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
858 bui.next().await.unwrap();
859 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
860 bui.next().await.unwrap();
861 assert!(!bui.is_valid());
862
863 bui.seek(iterator_test_user_key_of(9).as_ref())
865 .await
866 .unwrap();
867 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
868 bui.next().await.unwrap();
869 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
870 bui.next().await.unwrap();
871 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
872 bui.next().await.unwrap();
873 assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
874 bui.next().await.unwrap();
875 assert!(!bui.is_valid());
876 }
877
878 fn key_from_num(num: usize) -> UserKey<Vec<u8>> {
879 let width = 20;
880 UserKey::for_test(
881 TableId::default(),
882 format!("{:0width$}", num, width = width)
883 .as_bytes()
884 .to_vec(),
885 )
886 }
887
888 #[allow(clippy::mutable_key_type)]
889 async fn chaos_test_case(
890 handle: TableHolder,
891 start_bound: Bound<UserKey<Bytes>>,
892 end_bound: Bound<UserKey<Bytes>>,
893 truth: &ChaosTestTruth,
894 sstable_store: SstableStoreRef,
895 sstable_info: &SstableInfo,
896 ) {
897 let start_key = match &start_bound {
898 Bound::Included(b) => {
899 UserKey::for_test(b.table_id, Bytes::from(prev_key(&b.table_key.clone())))
900 }
901 Bound::Excluded(b) => b.clone(),
902 Unbounded => key_from_num(0).into_bytes(),
903 };
904 let end_key = match &end_bound {
905 Bound::Included(b) => b.clone(),
906 Unbounded => key_from_num(999999999999).into_bytes(),
907 _ => unimplemented!(),
908 };
909
910 let backward_iters = vec![BackwardSstableIterator::new(
911 handle,
912 sstable_store,
913 sstable_info,
914 )];
915 let bmi = MergeIterator::new(backward_iters);
916 let mut bui = BackwardUserIterator::for_test(bmi, (start_bound, end_bound));
917 let num_puts: usize = truth
918 .iter()
919 .map(|(key, inserts)| {
920 if *key > end_key || *key <= start_key {
921 return 0;
922 }
923 match inserts.first_key_value().unwrap().1 {
924 HummockValue::Put(_) => 1,
925 HummockValue::Delete => 0,
926 }
927 })
928 .reduce(|accum, item| accum + item)
929 .unwrap();
930 let mut num_kvs = 0;
931 bui.rewind().await.unwrap();
932 for (key, value) in truth.iter().rev() {
933 if *key > end_key || *key <= start_key {
934 continue;
935 }
936 let (_, value) = value.first_key_value().unwrap();
937 if let HummockValue::Delete = value {
938 continue;
939 }
940 assert!(bui.is_valid(), "num_kvs:{}", num_kvs);
941 assert_eq!(bui.key().user_key, key.as_ref(), "num_kvs:{}", num_kvs);
942 if let HummockValue::Put(bytes) = &value {
943 assert_eq!(bui.value(), bytes, "num_kvs:{}", num_kvs);
944 }
945 bui.next().await.unwrap();
946 num_kvs += 1;
947 }
948 assert!(!bui.is_valid());
949 assert_eq!(num_kvs, num_puts);
950 }
951
952 type ChaosTestTruth =
953 BTreeMap<UserKey<Bytes>, BTreeMap<Reverse<HummockEpoch>, HummockValue<Bytes>>>;
954
955 async fn generate_chaos_test_data() -> (
956 usize,
957 TableHolder,
958 ChaosTestTruth,
959 SstableStoreRef,
960 SstableInfo,
961 ) {
962 let mut rng = thread_rng();
964 #[allow(clippy::mutable_key_type)]
965 let mut truth: ChaosTestTruth = BTreeMap::new();
966 let mut prev_key_number: usize = 1;
967 let number_of_keys = 5000;
968 for _ in 0..number_of_keys {
969 let key: usize = rng.random_range(prev_key_number..=(prev_key_number + 10));
970 prev_key_number = key + 1;
971 let key_bytes = key_from_num(key).into_bytes();
972 let mut prev_time = 500;
973 let num_updates = rng.random_range(1..10usize);
974 for _ in 0..num_updates {
975 let time: HummockEpoch =
976 test_epoch(rng.random_range(prev_time..=(prev_time + 1000)));
977 let is_delete = rng.random_range(0..=1usize) < 1usize;
978 match is_delete {
979 true => {
980 truth
981 .entry(key_bytes.clone())
982 .or_default()
983 .insert(Reverse(time), HummockValue::delete());
984 }
985 false => {
986 let value_size = rng.random_range(100..=200);
987 let value: String = thread_rng()
988 .sample_iter(&Alphanumeric)
989 .take(value_size)
990 .map(char::from)
991 .collect();
992 truth
993 .entry(key_bytes.clone())
994 .or_default()
995 .insert(Reverse(time), HummockValue::put(Bytes::from(value)));
996 }
997 }
998 prev_time = time.next_epoch();
999 }
1000 }
1001 let sstable_store = mock_sstable_store().await;
1002 let (sst, sstable_info) = gen_test_sstable(
1003 default_builder_opt_for_test(),
1004 0,
1005 truth.iter().flat_map(|(key, inserts)| {
1006 inserts.iter().map(|(time, value)| {
1007 let full_key = FullKey {
1008 user_key: key.clone(),
1009 epoch_with_gap: EpochWithGap::new_from_epoch(time.0),
1010 };
1011 (full_key, value.clone())
1012 })
1013 }),
1014 sstable_store.clone(),
1015 )
1016 .await;
1017
1018 (prev_key_number, sst, truth, sstable_store, sstable_info)
1019 }
1020
1021 #[tokio::test]
1022 async fn test_backward_user_chaos_unbounded_unbounded() {
1023 let (_prev_key_number, sst, truth, sstable_store, sstable_info) =
1024 generate_chaos_test_data().await;
1025 let repeat = 20;
1026 for _ in 0..repeat {
1027 chaos_test_case(
1028 sst.clone(),
1029 Unbounded,
1030 Unbounded,
1031 &truth,
1032 sstable_store.clone(),
1033 &sstable_info,
1034 )
1035 .await;
1036 }
1037 }
1038
1039 #[tokio::test]
1040 async fn test_backward_user_chaos_unbounded_included() {
1041 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1042 generate_chaos_test_data().await;
1043 let repeat = 20;
1044 for _ in 0..repeat {
1045 let mut rng = thread_rng();
1046 let end_key: usize = rng.random_range(2..=prev_key_number);
1047 let end_key_bytes = key_from_num(end_key).into_bytes();
1048 chaos_test_case(
1049 sst.clone(),
1050 Unbounded,
1051 Included(end_key_bytes.clone()),
1052 &truth,
1053 sstable_store.clone(),
1054 &sstable_info,
1055 )
1056 .await;
1057 }
1058 }
1059
1060 #[tokio::test]
1061 async fn test_backward_user_chaos_included_unbounded() {
1062 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1063 generate_chaos_test_data().await;
1064 let repeat = 20;
1065 for _ in 0..repeat {
1066 let mut rng = thread_rng();
1067 let end_key: usize = rng.random_range(2..=prev_key_number);
1068 let begin_key: usize = rng.random_range(1..=end_key);
1069 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1070 chaos_test_case(
1071 sst.clone(),
1072 Included(begin_key_bytes.clone()),
1073 Unbounded,
1074 &truth,
1075 sstable_store.clone(),
1076 &sstable_info,
1077 )
1078 .await;
1079 }
1080 }
1081
1082 #[tokio::test]
1083 async fn test_backward_user_chaos_excluded_unbounded() {
1084 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1085 generate_chaos_test_data().await;
1086 let repeat = 20;
1087 for _ in 0..repeat {
1088 let mut rng = thread_rng();
1089 let end_key: usize = rng.random_range(2..=prev_key_number);
1090 let begin_key: usize = rng.random_range(1..=end_key);
1091 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1092 chaos_test_case(
1093 sst.clone(),
1094 Excluded(begin_key_bytes.clone()),
1095 Unbounded,
1096 &truth,
1097 sstable_store.clone(),
1098 &sstable_info,
1099 )
1100 .await;
1101 }
1102 }
1103
1104 #[tokio::test]
1105 async fn test_backward_user_chaos_included_included() {
1106 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1107 generate_chaos_test_data().await;
1108 let repeat = 20;
1109 for _ in 0..repeat {
1110 let mut rng = thread_rng();
1111 let end_key: usize = rng.random_range(2..=prev_key_number);
1112 let end_key_bytes = key_from_num(end_key).into_bytes();
1113 let begin_key: usize = rng.random_range(1..=end_key);
1114 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1115 chaos_test_case(
1116 sst.clone(),
1117 Included(begin_key_bytes.clone()),
1118 Included(end_key_bytes.clone()),
1119 &truth,
1120 sstable_store.clone(),
1121 &sstable_info,
1122 )
1123 .await;
1124 }
1125 }
1126
1127 #[tokio::test]
1128 async fn test_backward_user_chaos_excluded_included() {
1129 let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1130 generate_chaos_test_data().await;
1131 let repeat = 20;
1132 for _ in 0..repeat {
1133 let mut rng = thread_rng();
1134 let end_key: usize = rng.random_range(2..=prev_key_number);
1135 let end_key_bytes = key_from_num(end_key).into_bytes();
1136 let begin_key: usize = rng.random_range(1..=end_key);
1137 let begin_key_bytes = key_from_num(begin_key).into_bytes();
1138 chaos_test_case(
1139 sst.clone(),
1140 Excluded(begin_key_bytes),
1141 Included(end_key_bytes),
1142 &truth,
1143 sstable_store.clone(),
1144 &sstable_info,
1145 )
1146 .await;
1147 }
1148 }
1149
1150 #[tokio::test]
1151 async fn test_min_epoch() {
1152 let sstable_store = mock_sstable_store().await;
1153 let (table0, sstable_info_0) = gen_iterator_test_sstable_with_incr_epoch(
1154 0,
1155 default_builder_opt_for_test(),
1156 |x| x * 3,
1157 sstable_store.clone(),
1158 TEST_KEYS_COUNT,
1159 1,
1160 )
1161 .await;
1162
1163 let backward_iters = vec![BackwardSstableIterator::new(
1164 table0,
1165 sstable_store,
1166 &sstable_info_0,
1167 )];
1168
1169 let min_count = (TEST_KEYS_COUNT / 5) as u64;
1170 let min_epoch = test_epoch(min_count);
1171 let mi = MergeIterator::new(backward_iters);
1172 let mut ui = BackwardUserIterator::with_min_epoch(mi, (Unbounded, Unbounded), min_epoch);
1173 ui.rewind().await.unwrap();
1174
1175 let mut i = 0;
1176 while ui.is_valid() {
1177 let key = ui.key();
1178 let key_epoch = key.epoch_with_gap.pure_epoch();
1179 assert!(key_epoch > min_epoch);
1180
1181 i += 1;
1182 ui.next().await.unwrap();
1183 }
1184
1185 let expect_count = TEST_KEYS_COUNT - min_count as usize;
1186 assert_eq!(i, expect_count);
1187 }
1188}