risingwave_storage/hummock/iterator/
backward_user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound::*;
16
17use bytes::Bytes;
18use more_asserts::debug_assert_le;
19use risingwave_hummock_sdk::key::{FullKey, UserKey, UserKeyRange};
20use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
21
22use crate::hummock::HummockResult;
23use crate::hummock::iterator::{Backward, HummockIterator};
24use crate::hummock::local_version::pinned_version::PinnedVersion;
25use crate::hummock::value::HummockValue;
26use crate::monitor::StoreLocalStatistic;
27
28/// [`BackwardUserIterator`] can be used by user directly.
29pub struct BackwardUserIterator<I: HummockIterator<Direction = Backward>> {
30    /// Inner table iterator.
31    iterator: I,
32
33    /// We just met a new key
34    just_met_new_key: bool,
35
36    /// Last user key
37    last_key: FullKey<Bytes>,
38
39    /// Last user value
40    last_val: Bytes,
41
42    /// Last user key value is deleted
43    last_delete: bool,
44
45    /// Flag for whether the iterator reaches over the right end of the range.
46    out_of_range: bool,
47
48    /// Start and end bounds of user key.
49    key_range: UserKeyRange,
50
51    /// Only reads values if `epoch <= self.read_epoch`.
52    read_epoch: HummockEpoch,
53
54    /// Only reads values if `ts > self.min_epoch`. use for ttl
55    min_epoch: HummockEpoch,
56
57    /// Ensures the SSTs needed by `iterator` won't be vacuumed.
58    _version: Option<PinnedVersion>,
59
60    /// Store scan statistic
61    stats: StoreLocalStatistic,
62}
63
64impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
65    /// Creates [`BackwardUserIterator`] with given `read_epoch`.
66    pub fn new(
67        iterator: I,
68        key_range: UserKeyRange,
69        read_epoch: u64,
70        min_epoch: u64,
71        version: Option<PinnedVersion>,
72    ) -> Self {
73        Self {
74            iterator,
75            out_of_range: false,
76            key_range,
77            just_met_new_key: false,
78            last_key: FullKey::default(),
79            last_val: Bytes::new(),
80            last_delete: true,
81            read_epoch,
82            min_epoch,
83            stats: StoreLocalStatistic::default(),
84            _version: version,
85        }
86    }
87
88    fn out_of_range(&self, key: UserKey<&[u8]>) -> bool {
89        match &self.key_range.0 {
90            Included(begin_key) => key < begin_key.as_ref(),
91            Excluded(begin_key) => key <= begin_key.as_ref(),
92            Unbounded => false,
93        }
94    }
95
96    fn reset(&mut self) {
97        self.last_key = FullKey::default();
98        self.just_met_new_key = false;
99        self.last_delete = true;
100        self.out_of_range = false;
101    }
102
103    /// Gets the iterator move to the next step.
104    ///
105    /// Returned result:
106    /// - if `Ok(())` is returned, it means that the iterator successfully move to the next position
107    ///   (may reach to the end and thus not valid)
108    /// - if `Err(_) ` is returned, it means that some error happened.
109    pub async fn next(&mut self) -> HummockResult<()> {
110        // We need to deal with three cases:
111        // 1. current key == last key.
112        //    Since current key must have an epoch newer than the one of the last key,
113        //    we assign current kv as the new last kv and also inherit its status of deletion, and
114        // continue.
115        //
116        // 2. current key != last key.
117        //    We have to make a decision for the last key.
118        //    a. If it is not deleted, we stop.
119        //    b. Otherwise, we continue to find the next new key.
120        //
121        // 3. `self.iterator` invalid. The case is the same as 2. However, option b is invalid now.
122        // We just stop. Without further `next`, `BackwardUserIterator` is still valid.
123
124        // We remark that whether `self.iterator` is valid and `BackwardUserIterator` is valid can
125        // be different even if we leave `out_of_range` out of consideration. This diffs
126        // from `UserIterator` because we always make a decision about the past key only
127        // when we enter a new state, such as encountering a new key, or `self.iterator`
128        // turning invalid.
129
130        if !self.iterator.is_valid() {
131            // We abuse `last_delete` to indicate that we are indeed invalid now, i.e. run out of kv
132            // pairs.
133            self.last_delete = true;
134            return Ok(());
135        }
136
137        while self.iterator.is_valid() {
138            let full_key = self.iterator.key();
139            let epoch = full_key.epoch_with_gap.pure_epoch();
140            let key = &full_key.user_key;
141
142            if epoch > self.min_epoch && epoch <= self.read_epoch {
143                if self.just_met_new_key {
144                    self.last_key = full_key.copy_into();
145                    self.just_met_new_key = false;
146                    // If we encounter an out-of-range key, stop early.
147                    if self.out_of_range(self.last_key.user_key.as_ref()) {
148                        self.out_of_range = true;
149                        break;
150                    }
151                } else if self.last_key.user_key.as_ref() != *key {
152                    if !self.last_delete {
153                        // We remark that we don't check `out_of_range` here as the other two cases
154                        // covered all situation. 2(a)
155                        self.just_met_new_key = true;
156                        self.stats.processed_key_count += 1;
157                        return Ok(());
158                    } else {
159                        // 2(b)
160                        self.last_key = full_key.copy_into();
161                        // If we encounter an out-of-range key, stop early.
162                        if self.out_of_range(self.last_key.user_key.as_ref()) {
163                            self.out_of_range = true;
164                            break;
165                        }
166                    }
167                } else {
168                    self.stats.skip_multi_version_key_count += 1;
169                }
170                // TODO: Since the real world workload may follow power law or 20/80 rule, or
171                // whatever name. We may directly seek to the next key if we have
172                // been seeing the same key for too many times.
173
174                // 1 and 2(a)
175                match self.iterator.value() {
176                    HummockValue::Put(val) => {
177                        // TODO: unconditionally set the last key may lead to redundant copies
178                        self.last_key = full_key.copy_into();
179                        self.last_val = Bytes::copy_from_slice(val);
180                        self.last_delete = false;
181                    }
182                    HummockValue::Delete => {
183                        self.last_delete = true;
184                    }
185                }
186            }
187            self.iterator.next().await?;
188        }
189        Ok(()) // not valid, EOF
190    }
191
192    /// Returns the key with the newest version. Thus no version in it, and only the `user_key` will
193    /// be returned.
194    ///
195    /// The returned key is de-duplicated and thus it will not output the same key, unless the
196    /// `rewind` or `seek` methods are called.
197    ///
198    /// Note: before call the function you need to ensure that the iterator is valid.
199    pub fn key(&self) -> FullKey<&[u8]> {
200        assert!(self.is_valid());
201        self.last_key.to_ref()
202    }
203
204    /// The returned value is in the form of user value.
205    ///
206    /// Note: before call the function you need to ensure that the iterator is valid.
207    pub fn value(&self) -> &[u8] {
208        assert!(self.is_valid());
209        &self.last_val
210    }
211
212    /// Resets the iterating position to the beginning.
213    pub async fn rewind(&mut self) -> HummockResult<()> {
214        // Handle range scan
215        match &self.key_range.1 {
216            Included(end_key) | Excluded(end_key) => {
217                let full_key = FullKey {
218                    user_key: end_key.as_ref(),
219                    epoch_with_gap: EpochWithGap::new_min_epoch(),
220                };
221                self.iterator.seek(full_key).await?;
222            }
223            Unbounded => self.iterator.rewind().await?,
224        };
225
226        // Handle multi-version
227        self.reset();
228        // Handle range scan when key < begin_key
229        self.next().await?;
230        if let Excluded(end_key) = &self.key_range.1
231            && self.is_valid()
232            && self.key().user_key == end_key.as_ref()
233        {
234            self.next().await?;
235        }
236        Ok(())
237    }
238
239    /// Resets the iterating position to the first position where the key >= provided key.
240    pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
241        // Handle range scan when key > end_key
242        let seek_key = match &self.key_range.1 {
243            Included(end_key) | Excluded(end_key) => {
244                let end_key = end_key.as_ref();
245                if end_key < user_key {
246                    end_key
247                } else {
248                    user_key
249                }
250            }
251            Unbounded => user_key,
252        };
253        let full_key = FullKey {
254            user_key: seek_key,
255            epoch_with_gap: EpochWithGap::new_min_epoch(),
256        };
257        self.iterator.seek(full_key).await?;
258
259        // Handle multi-version
260        self.reset();
261        // Handle range scan when key < begin_key
262        self.next().await?;
263        if let Excluded(end_key) = &self.key_range.1
264            && self.is_valid()
265            && self.key().user_key == end_key.as_ref()
266        {
267            debug_assert_le!(end_key.as_ref(), user_key);
268            self.next().await?;
269        }
270        Ok(())
271    }
272
273    /// Indicates whether the iterator can be used.
274    pub fn is_valid(&self) -> bool {
275        // Handle range scan
276        // key <= end_key is guaranteed by seek/rewind function
277        // We remark that there are only three cases out of four combinations:
278        // (iterator valid && last_delete false) is impossible
279        let has_enough_input = self.iterator.is_valid() || !self.last_delete;
280        has_enough_input && (!self.out_of_range)
281    }
282
283    pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
284        stats.add(&self.stats);
285        self.iterator.collect_local_statistic(stats);
286    }
287}
288
289#[cfg(test)]
290impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
291    /// Creates [`BackwardUserIterator`] with maximum epoch.
292    pub(crate) fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
293        Self::new(iterator, key_range, HummockEpoch::MAX, 0, None)
294    }
295
296    /// Creates [`BackwardUserIterator`] with maximum epoch.
297    pub(crate) fn with_min_epoch(
298        iterator: I,
299        key_range: UserKeyRange,
300        min_epoch: HummockEpoch,
301    ) -> Self {
302        Self::new(iterator, key_range, HummockEpoch::MAX, min_epoch, None)
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use std::cmp::Reverse;
309    use std::collections::BTreeMap;
310    use std::ops::Bound::{self, *};
311
312    use rand::distr::Alphanumeric;
313    use rand::{Rng, rng as thread_rng};
314    use risingwave_common::catalog::TableId;
315    use risingwave_common::util::epoch::{EpochExt, test_epoch};
316    use risingwave_hummock_sdk::key::prev_key;
317    use risingwave_hummock_sdk::sstable_info::SstableInfo;
318
319    use super::*;
320    use crate::hummock::iterator::MergeIterator;
321    use crate::hummock::iterator::test_utils::{
322        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_base,
323        gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
324        iterator_test_bytes_key_of, iterator_test_bytes_key_of_epoch,
325        iterator_test_bytes_user_key_of, iterator_test_user_key_of, iterator_test_value_of,
326        mock_sstable_store,
327    };
328    use crate::hummock::test_utils::gen_test_sstable;
329    use crate::hummock::{BackwardSstableIterator, SstableStoreRef, TableHolder};
330
331    #[tokio::test]
332    async fn test_backward_user_basic() {
333        let sstable_store = mock_sstable_store().await;
334        let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
335            0,
336            default_builder_opt_for_test(),
337            |x| x * 3 + 1,
338            sstable_store.clone(),
339            TEST_KEYS_COUNT,
340        )
341        .await;
342        let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
343            1,
344            default_builder_opt_for_test(),
345            |x| x * 3 + 2,
346            sstable_store.clone(),
347            TEST_KEYS_COUNT,
348        )
349        .await;
350        let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
351            2,
352            default_builder_opt_for_test(),
353            |x| x * 3 + 3,
354            sstable_store.clone(),
355            TEST_KEYS_COUNT,
356        )
357        .await;
358
359        let backward_iters = vec![
360            BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
361            BackwardSstableIterator::new(table1, sstable_store.clone(), &sstable_info_1),
362            BackwardSstableIterator::new(table2, sstable_store, &sstable_info_2),
363        ];
364
365        let mi = MergeIterator::new(backward_iters);
366        let mut ui = BackwardUserIterator::for_test(mi, (Unbounded, Unbounded));
367        let mut i = 3 * TEST_KEYS_COUNT;
368        ui.rewind().await.unwrap();
369        while ui.is_valid() {
370            let key = ui.key();
371            let val = ui.value();
372            assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
373            assert_eq!(val, iterator_test_value_of(i).as_slice());
374            i -= 1;
375            ui.next().await.unwrap();
376            if i == 0 {
377                assert!(!ui.is_valid());
378                break;
379            }
380        }
381    }
382
383    #[tokio::test]
384    async fn test_backward_user_seek() {
385        let sstable_store = mock_sstable_store().await;
386        let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
387            0,
388            default_builder_opt_for_test(),
389            |x| x * 3 + 1,
390            sstable_store.clone(),
391            TEST_KEYS_COUNT,
392        )
393        .await;
394        let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
395            1,
396            default_builder_opt_for_test(),
397            |x| x * 3 + 2,
398            sstable_store.clone(),
399            TEST_KEYS_COUNT,
400        )
401        .await;
402        let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
403            2,
404            default_builder_opt_for_test(),
405            |x| x * 3 + 3,
406            sstable_store.clone(),
407            TEST_KEYS_COUNT,
408        )
409        .await;
410        let backward_iters = vec![
411            BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
412            BackwardSstableIterator::new(table1, sstable_store.clone(), &sstable_info_1),
413            BackwardSstableIterator::new(table2, sstable_store, &sstable_info_2),
414        ];
415
416        let bmi = MergeIterator::new(backward_iters);
417        let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
418
419        // right edge case
420        bui.seek(iterator_test_user_key_of(0).as_ref())
421            .await
422            .unwrap();
423        assert!(!bui.is_valid());
424
425        // normal case
426        bui.seek(iterator_test_user_key_of(TEST_KEYS_COUNT + 4).as_ref())
427            .await
428            .unwrap();
429        let k = bui.key();
430        let v = bui.value();
431        assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 4).as_slice());
432        assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 4).to_ref());
433        bui.seek(iterator_test_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
434            .await
435            .unwrap();
436        let k = bui.key();
437        let v = bui.value();
438        assert_eq!(
439            v,
440            iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
441        );
442        assert_eq!(
443            k,
444            iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
445        );
446
447        // left edge case
448        bui.seek(iterator_test_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
449            .await
450            .unwrap();
451        let k = bui.key();
452        let v = bui.value();
453        assert_eq!(v, iterator_test_value_of(3 * TEST_KEYS_COUNT).as_slice());
454        assert_eq!(k, iterator_test_bytes_key_of(3 * TEST_KEYS_COUNT).to_ref());
455    }
456
457    #[tokio::test]
458    async fn test_backward_user_delete() {
459        let sstable_store = mock_sstable_store().await;
460        // key=[idx, epoch], value
461        let kv_pairs = vec![
462            (1, 300, HummockValue::delete()),
463            (2, 100, HummockValue::put(iterator_test_value_of(2))),
464        ];
465        let (table0, sstable_info_0) =
466            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
467
468        let kv_pairs = vec![
469            (1, 400, HummockValue::put(iterator_test_value_of(1))),
470            (2, 200, HummockValue::delete()),
471        ];
472        let (table1, sstable_info_1) =
473            gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
474        let backward_iters = vec![
475            BackwardSstableIterator::new(table0, sstable_store.clone(), &sstable_info_0),
476            BackwardSstableIterator::new(table1, sstable_store, &sstable_info_1),
477        ];
478        let bmi = MergeIterator::new(backward_iters);
479        let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded));
480
481        bui.rewind().await.unwrap();
482
483        // verify
484        let k = bui.key();
485        let v = bui.value();
486
487        assert_eq!(k, iterator_test_bytes_key_of_epoch(1, 400).to_ref());
488        assert_eq!(v, iterator_test_value_of(1).as_slice());
489
490        // only one valid kv pair
491        bui.next().await.unwrap();
492        assert!(!bui.is_valid());
493    }
494
495    // left..=end
496    #[tokio::test]
497    async fn test_backward_user_range_inclusive() {
498        let sstable_store = mock_sstable_store().await;
499        // key=[idx, epoch], value
500        let kv_pairs = vec![
501            (0, 200, HummockValue::delete()),
502            (0, 100, HummockValue::put(iterator_test_value_of(0))),
503            (1, 200, HummockValue::put(iterator_test_value_of(1))),
504            (1, 100, HummockValue::delete()),
505            (2, 400, HummockValue::delete()),
506            (2, 300, HummockValue::put(iterator_test_value_of(2))),
507            (2, 200, HummockValue::delete()),
508            (2, 100, HummockValue::put(iterator_test_value_of(2))),
509            (3, 100, HummockValue::put(iterator_test_value_of(3))),
510            (5, 200, HummockValue::delete()),
511            (5, 100, HummockValue::put(iterator_test_value_of(5))),
512            (6, 100, HummockValue::put(iterator_test_value_of(6))),
513            (7, 300, HummockValue::put(iterator_test_value_of(7))),
514            (7, 200, HummockValue::delete()),
515            (7, 100, HummockValue::put(iterator_test_value_of(7))),
516            (8, 100, HummockValue::put(iterator_test_value_of(8))),
517        ];
518        let (sstable, sstable_info) =
519            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
520        let backward_iters = vec![BackwardSstableIterator::new(
521            sstable,
522            sstable_store,
523            &sstable_info,
524        )];
525        let bmi = MergeIterator::new(backward_iters);
526
527        let begin_key = Included(iterator_test_bytes_user_key_of(2));
528        let end_key = Included(iterator_test_bytes_user_key_of(7));
529
530        let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
531
532        // ----- basic iterate -----
533        bui.rewind().await.unwrap();
534        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
535        bui.next().await.unwrap();
536        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
537        bui.next().await.unwrap();
538        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
539        bui.next().await.unwrap();
540        assert!(!bui.is_valid());
541
542        // ----- after-end-range iterate -----
543        bui.seek(iterator_test_user_key_of(8).as_ref())
544            .await
545            .unwrap();
546        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
547        bui.next().await.unwrap();
548        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
549        bui.next().await.unwrap();
550        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
551        bui.next().await.unwrap();
552        assert!(!bui.is_valid());
553
554        // ----- end-range iterate -----
555        bui.seek(iterator_test_user_key_of(7).as_ref())
556            .await
557            .unwrap();
558        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 300).to_ref());
559        bui.next().await.unwrap();
560        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
561        bui.next().await.unwrap();
562        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
563        bui.next().await.unwrap();
564        assert!(!bui.is_valid());
565
566        // ----- begin-range iterate -----
567        bui.seek(iterator_test_user_key_of(2).as_ref())
568            .await
569            .unwrap();
570        assert!(!bui.is_valid());
571
572        // ----- before-begin-range iterate -----
573        bui.seek(iterator_test_user_key_of(1).as_ref())
574            .await
575            .unwrap();
576        assert!(!bui.is_valid());
577    }
578
579    // left..end
580    #[tokio::test]
581    async fn test_backward_user_range() {
582        let sstable_store = mock_sstable_store().await;
583        // key=[idx, epoch], value
584        let kv_pairs = vec![
585            (0, 200, HummockValue::delete()),
586            (0, 100, HummockValue::put(iterator_test_value_of(0))),
587            (1, 200, HummockValue::put(iterator_test_value_of(1))),
588            (1, 100, HummockValue::delete()),
589            (2, 300, HummockValue::put(iterator_test_value_of(2))),
590            (2, 200, HummockValue::delete()),
591            (2, 100, HummockValue::delete()),
592            (3, 100, HummockValue::put(iterator_test_value_of(3))),
593            (5, 200, HummockValue::delete()),
594            (5, 100, HummockValue::put(iterator_test_value_of(5))),
595            (6, 100, HummockValue::put(iterator_test_value_of(6))),
596            (7, 100, HummockValue::put(iterator_test_value_of(7))),
597            (8, 100, HummockValue::put(iterator_test_value_of(8))),
598        ];
599        let (sstable, sstable_info) =
600            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
601        let backward_iters = vec![BackwardSstableIterator::new(
602            sstable.clone(),
603            sstable_store.clone(),
604            &sstable_info,
605        )];
606        let bmi = MergeIterator::new(backward_iters);
607
608        let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
609        let end_key = Included(iterator_test_bytes_user_key_of(7));
610
611        let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
612
613        // ----- basic iterate -----
614        bui.rewind().await.unwrap();
615        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
616        bui.next().await.unwrap();
617        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
618        bui.next().await.unwrap();
619        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
620        bui.next().await.unwrap();
621        assert!(!bui.is_valid());
622
623        // ----- after-bend-range iterate -----
624        bui.seek(iterator_test_user_key_of(8).as_ref())
625            .await
626            .unwrap();
627        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
628        bui.next().await.unwrap();
629        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
630        bui.next().await.unwrap();
631        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
632        bui.next().await.unwrap();
633        assert!(!bui.is_valid());
634
635        // ----- end-range iterate -----
636        bui.seek(iterator_test_user_key_of(7).as_ref())
637            .await
638            .unwrap();
639        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(7, 100).to_ref());
640        bui.next().await.unwrap();
641        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
642        bui.next().await.unwrap();
643        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
644        bui.next().await.unwrap();
645        assert!(!bui.is_valid());
646
647        // ----- begin-range iterate -----
648        bui.seek(iterator_test_user_key_of(2).as_ref())
649            .await
650            .unwrap();
651        assert!(!bui.is_valid());
652
653        // ----- begin-begin-range iterate -----
654        bui.seek(iterator_test_user_key_of(1).as_ref())
655            .await
656            .unwrap();
657        assert!(!bui.is_valid());
658
659        let backward_iters = vec![BackwardSstableIterator::new(
660            sstable,
661            sstable_store,
662            &sstable_info,
663        )];
664        let bmi = MergeIterator::new(backward_iters);
665
666        let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
667        let end_key = Excluded(iterator_test_bytes_user_key_of(7));
668
669        let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, end_key));
670        bui.rewind().await.unwrap();
671        assert!(bui.is_valid());
672        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
673        // ----- end-range iterate -----
674        bui.seek(iterator_test_user_key_of(7).as_ref())
675            .await
676            .unwrap();
677        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
678        bui.seek(iterator_test_user_key_of(5).as_ref())
679            .await
680            .unwrap();
681        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
682    }
683
684    // ..=right
685    #[tokio::test]
686    async fn test_backward_user_range_to_inclusive() {
687        let sstable_store = mock_sstable_store().await;
688        // key=[idx, epoch], value
689        let kv_pairs = vec![
690            (0, 200, HummockValue::delete()),
691            (0, 100, HummockValue::put(iterator_test_value_of(0))),
692            (1, 200, HummockValue::put(iterator_test_value_of(1))),
693            (1, 100, HummockValue::delete()),
694            (2, 300, HummockValue::put(iterator_test_value_of(2))),
695            (2, 200, HummockValue::delete()),
696            (2, 100, HummockValue::delete()),
697            (3, 100, HummockValue::put(iterator_test_value_of(3))),
698            (5, 200, HummockValue::delete()),
699            (5, 100, HummockValue::put(iterator_test_value_of(5))),
700            (6, 100, HummockValue::put(iterator_test_value_of(6))),
701            (7, 200, HummockValue::delete()),
702            (7, 100, HummockValue::put(iterator_test_value_of(7))),
703            (8, 100, HummockValue::put(iterator_test_value_of(8))),
704        ];
705        let (sstable, sstable_info) =
706            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
707        let backward_iters = vec![BackwardSstableIterator::new(
708            sstable.clone(),
709            sstable_store.clone(),
710            &sstable_info,
711        )];
712        let bmi = MergeIterator::new(backward_iters);
713        let end_key = Included(iterator_test_bytes_user_key_of(7));
714
715        let mut bui = BackwardUserIterator::for_test(
716            bmi,
717            (Included(iterator_test_bytes_user_key_of(2)), end_key),
718        );
719
720        // ----- basic iterate -----
721        bui.rewind().await.unwrap();
722        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
723        bui.next().await.unwrap();
724        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
725        bui.next().await.unwrap();
726        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
727        bui.next().await.unwrap();
728        assert!(!bui.is_valid());
729
730        // ----- end-range iterate -----
731        bui.seek(iterator_test_user_key_of(7).as_ref())
732            .await
733            .unwrap();
734        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
735        bui.next().await.unwrap();
736        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
737        bui.next().await.unwrap();
738        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
739        bui.next().await.unwrap();
740        assert!(!bui.is_valid());
741
742        // ----- in-range iterate -----
743        bui.seek(iterator_test_user_key_of(6).as_ref())
744            .await
745            .unwrap();
746        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
747        bui.next().await.unwrap();
748        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
749        bui.next().await.unwrap();
750        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
751        bui.next().await.unwrap();
752        assert!(!bui.is_valid());
753
754        // ----- begin-range iterate -----
755        bui.seek(iterator_test_user_key_of(0).as_ref())
756            .await
757            .unwrap();
758        assert!(!bui.is_valid());
759
760        let end_key = Excluded(iterator_test_bytes_user_key_of(6));
761        let backward_iters = vec![BackwardSstableIterator::new(
762            sstable,
763            sstable_store,
764            &sstable_info,
765        )];
766        let bmi = MergeIterator::new(backward_iters);
767        let mut bui = BackwardUserIterator::for_test(
768            bmi,
769            (Excluded(iterator_test_bytes_user_key_of(2)), end_key),
770        );
771        // ----- basic iterate -----
772        bui.seek(iterator_test_user_key_of(6).as_ref())
773            .await
774            .unwrap();
775        assert!(bui.is_valid());
776        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
777        bui.next().await.unwrap();
778        assert!(!bui.is_valid());
779        bui.seek(iterator_test_user_key_of(7).as_ref())
780            .await
781            .unwrap();
782        assert!(bui.is_valid());
783        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
784    }
785
786    // left..
787    #[tokio::test]
788    async fn test_backward_user_range_from() {
789        let sstable_store = mock_sstable_store().await;
790        // key=[idx, epoch], value
791        let kv_pairs = vec![
792            (0, 200, HummockValue::delete()),
793            (0, 100, HummockValue::put(iterator_test_value_of(0))),
794            (1, 200, HummockValue::put(iterator_test_value_of(1))),
795            (1, 100, HummockValue::delete()),
796            (2, 300, HummockValue::put(iterator_test_value_of(2))),
797            (2, 200, HummockValue::delete()),
798            (2, 100, HummockValue::delete()),
799            (3, 100, HummockValue::put(iterator_test_value_of(3))),
800            (5, 200, HummockValue::delete()),
801            (5, 100, HummockValue::put(iterator_test_value_of(5))),
802            (6, 100, HummockValue::put(iterator_test_value_of(6))),
803            (7, 200, HummockValue::delete()),
804            (7, 100, HummockValue::put(iterator_test_value_of(7))),
805            (8, 100, HummockValue::put(iterator_test_value_of(8))),
806        ];
807        let (handle, sstable_info) =
808            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
809        let backward_iters = vec![BackwardSstableIterator::new(
810            handle,
811            sstable_store,
812            &sstable_info,
813        )];
814        let bmi = MergeIterator::new(backward_iters);
815        let begin_key = Included(iterator_test_bytes_user_key_of(2));
816
817        let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, Unbounded));
818
819        // ----- basic iterate -----
820        bui.rewind().await.unwrap();
821        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
822        bui.next().await.unwrap();
823        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
824        bui.next().await.unwrap();
825        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
826        bui.next().await.unwrap();
827        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
828        bui.next().await.unwrap();
829        assert!(!bui.is_valid());
830
831        // ----- begin-range iterate -----
832        bui.seek(iterator_test_user_key_of(2).as_ref())
833            .await
834            .unwrap();
835        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
836        bui.next().await.unwrap();
837        assert!(!bui.is_valid());
838
839        // ----- in-range iterate -----
840        bui.seek(iterator_test_user_key_of(5).as_ref())
841            .await
842            .unwrap();
843        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
844        bui.next().await.unwrap();
845        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
846        bui.next().await.unwrap();
847        assert!(!bui.is_valid());
848
849        // ----- end-range iterate -----
850        bui.seek(iterator_test_user_key_of(8).as_ref())
851            .await
852            .unwrap();
853        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
854        bui.next().await.unwrap();
855        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
856        bui.next().await.unwrap();
857        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
858        bui.next().await.unwrap();
859        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
860        bui.next().await.unwrap();
861        assert!(!bui.is_valid());
862
863        // ----- after-end-range iterate -----
864        bui.seek(iterator_test_user_key_of(9).as_ref())
865            .await
866            .unwrap();
867        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
868        bui.next().await.unwrap();
869        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
870        bui.next().await.unwrap();
871        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
872        bui.next().await.unwrap();
873        assert_eq!(bui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
874        bui.next().await.unwrap();
875        assert!(!bui.is_valid());
876    }
877
878    fn key_from_num(num: usize) -> UserKey<Vec<u8>> {
879        let width = 20;
880        UserKey::for_test(
881            TableId::default(),
882            format!("{:0width$}", num, width = width)
883                .as_bytes()
884                .to_vec(),
885        )
886    }
887
888    #[allow(clippy::mutable_key_type)]
889    async fn chaos_test_case(
890        handle: TableHolder,
891        start_bound: Bound<UserKey<Bytes>>,
892        end_bound: Bound<UserKey<Bytes>>,
893        truth: &ChaosTestTruth,
894        sstable_store: SstableStoreRef,
895        sstable_info: &SstableInfo,
896    ) {
897        let start_key = match &start_bound {
898            Bound::Included(b) => {
899                UserKey::for_test(b.table_id, Bytes::from(prev_key(&b.table_key.clone())))
900            }
901            Bound::Excluded(b) => b.clone(),
902            Unbounded => key_from_num(0).into_bytes(),
903        };
904        let end_key = match &end_bound {
905            Bound::Included(b) => b.clone(),
906            Unbounded => key_from_num(999999999999).into_bytes(),
907            _ => unimplemented!(),
908        };
909
910        let backward_iters = vec![BackwardSstableIterator::new(
911            handle,
912            sstable_store,
913            sstable_info,
914        )];
915        let bmi = MergeIterator::new(backward_iters);
916        let mut bui = BackwardUserIterator::for_test(bmi, (start_bound, end_bound));
917        let num_puts: usize = truth
918            .iter()
919            .map(|(key, inserts)| {
920                if *key > end_key || *key <= start_key {
921                    return 0;
922                }
923                match inserts.first_key_value().unwrap().1 {
924                    HummockValue::Put(_) => 1,
925                    HummockValue::Delete => 0,
926                }
927            })
928            .reduce(|accum, item| accum + item)
929            .unwrap();
930        let mut num_kvs = 0;
931        bui.rewind().await.unwrap();
932        for (key, value) in truth.iter().rev() {
933            if *key > end_key || *key <= start_key {
934                continue;
935            }
936            let (_, value) = value.first_key_value().unwrap();
937            if let HummockValue::Delete = value {
938                continue;
939            }
940            assert!(bui.is_valid(), "num_kvs:{}", num_kvs);
941            assert_eq!(bui.key().user_key, key.as_ref(), "num_kvs:{}", num_kvs);
942            if let HummockValue::Put(bytes) = &value {
943                assert_eq!(bui.value(), bytes, "num_kvs:{}", num_kvs);
944            }
945            bui.next().await.unwrap();
946            num_kvs += 1;
947        }
948        assert!(!bui.is_valid());
949        assert_eq!(num_kvs, num_puts);
950    }
951
952    type ChaosTestTruth =
953        BTreeMap<UserKey<Bytes>, BTreeMap<Reverse<HummockEpoch>, HummockValue<Bytes>>>;
954
955    async fn generate_chaos_test_data() -> (
956        usize,
957        TableHolder,
958        ChaosTestTruth,
959        SstableStoreRef,
960        SstableInfo,
961    ) {
962        // We first generate the key value pairs.
963        let mut rng = thread_rng();
964        #[allow(clippy::mutable_key_type)]
965        let mut truth: ChaosTestTruth = BTreeMap::new();
966        let mut prev_key_number: usize = 1;
967        let number_of_keys = 5000;
968        for _ in 0..number_of_keys {
969            let key: usize = rng.random_range(prev_key_number..=(prev_key_number + 10));
970            prev_key_number = key + 1;
971            let key_bytes = key_from_num(key).into_bytes();
972            let mut prev_time = 500;
973            let num_updates = rng.random_range(1..10usize);
974            for _ in 0..num_updates {
975                let time: HummockEpoch =
976                    test_epoch(rng.random_range(prev_time..=(prev_time + 1000)));
977                let is_delete = rng.random_range(0..=1usize) < 1usize;
978                match is_delete {
979                    true => {
980                        truth
981                            .entry(key_bytes.clone())
982                            .or_default()
983                            .insert(Reverse(time), HummockValue::delete());
984                    }
985                    false => {
986                        let value_size = rng.random_range(100..=200);
987                        let value: String = thread_rng()
988                            .sample_iter(&Alphanumeric)
989                            .take(value_size)
990                            .map(char::from)
991                            .collect();
992                        truth
993                            .entry(key_bytes.clone())
994                            .or_default()
995                            .insert(Reverse(time), HummockValue::put(Bytes::from(value)));
996                    }
997                }
998                prev_time = time.next_epoch();
999            }
1000        }
1001        let sstable_store = mock_sstable_store().await;
1002        let (sst, sstable_info) = gen_test_sstable(
1003            default_builder_opt_for_test(),
1004            0,
1005            truth.iter().flat_map(|(key, inserts)| {
1006                inserts.iter().map(|(time, value)| {
1007                    let full_key = FullKey {
1008                        user_key: key.clone(),
1009                        epoch_with_gap: EpochWithGap::new_from_epoch(time.0),
1010                    };
1011                    (full_key, value.clone())
1012                })
1013            }),
1014            sstable_store.clone(),
1015        )
1016        .await;
1017
1018        (prev_key_number, sst, truth, sstable_store, sstable_info)
1019    }
1020
1021    #[tokio::test]
1022    async fn test_backward_user_chaos_unbounded_unbounded() {
1023        let (_prev_key_number, sst, truth, sstable_store, sstable_info) =
1024            generate_chaos_test_data().await;
1025        let repeat = 20;
1026        for _ in 0..repeat {
1027            chaos_test_case(
1028                sst.clone(),
1029                Unbounded,
1030                Unbounded,
1031                &truth,
1032                sstable_store.clone(),
1033                &sstable_info,
1034            )
1035            .await;
1036        }
1037    }
1038
1039    #[tokio::test]
1040    async fn test_backward_user_chaos_unbounded_included() {
1041        let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1042            generate_chaos_test_data().await;
1043        let repeat = 20;
1044        for _ in 0..repeat {
1045            let mut rng = thread_rng();
1046            let end_key: usize = rng.random_range(2..=prev_key_number);
1047            let end_key_bytes = key_from_num(end_key).into_bytes();
1048            chaos_test_case(
1049                sst.clone(),
1050                Unbounded,
1051                Included(end_key_bytes.clone()),
1052                &truth,
1053                sstable_store.clone(),
1054                &sstable_info,
1055            )
1056            .await;
1057        }
1058    }
1059
1060    #[tokio::test]
1061    async fn test_backward_user_chaos_included_unbounded() {
1062        let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1063            generate_chaos_test_data().await;
1064        let repeat = 20;
1065        for _ in 0..repeat {
1066            let mut rng = thread_rng();
1067            let end_key: usize = rng.random_range(2..=prev_key_number);
1068            let begin_key: usize = rng.random_range(1..=end_key);
1069            let begin_key_bytes = key_from_num(begin_key).into_bytes();
1070            chaos_test_case(
1071                sst.clone(),
1072                Included(begin_key_bytes.clone()),
1073                Unbounded,
1074                &truth,
1075                sstable_store.clone(),
1076                &sstable_info,
1077            )
1078            .await;
1079        }
1080    }
1081
1082    #[tokio::test]
1083    async fn test_backward_user_chaos_excluded_unbounded() {
1084        let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1085            generate_chaos_test_data().await;
1086        let repeat = 20;
1087        for _ in 0..repeat {
1088            let mut rng = thread_rng();
1089            let end_key: usize = rng.random_range(2..=prev_key_number);
1090            let begin_key: usize = rng.random_range(1..=end_key);
1091            let begin_key_bytes = key_from_num(begin_key).into_bytes();
1092            chaos_test_case(
1093                sst.clone(),
1094                Excluded(begin_key_bytes.clone()),
1095                Unbounded,
1096                &truth,
1097                sstable_store.clone(),
1098                &sstable_info,
1099            )
1100            .await;
1101        }
1102    }
1103
1104    #[tokio::test]
1105    async fn test_backward_user_chaos_included_included() {
1106        let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1107            generate_chaos_test_data().await;
1108        let repeat = 20;
1109        for _ in 0..repeat {
1110            let mut rng = thread_rng();
1111            let end_key: usize = rng.random_range(2..=prev_key_number);
1112            let end_key_bytes = key_from_num(end_key).into_bytes();
1113            let begin_key: usize = rng.random_range(1..=end_key);
1114            let begin_key_bytes = key_from_num(begin_key).into_bytes();
1115            chaos_test_case(
1116                sst.clone(),
1117                Included(begin_key_bytes.clone()),
1118                Included(end_key_bytes.clone()),
1119                &truth,
1120                sstable_store.clone(),
1121                &sstable_info,
1122            )
1123            .await;
1124        }
1125    }
1126
1127    #[tokio::test]
1128    async fn test_backward_user_chaos_excluded_included() {
1129        let (prev_key_number, sst, truth, sstable_store, sstable_info) =
1130            generate_chaos_test_data().await;
1131        let repeat = 20;
1132        for _ in 0..repeat {
1133            let mut rng = thread_rng();
1134            let end_key: usize = rng.random_range(2..=prev_key_number);
1135            let end_key_bytes = key_from_num(end_key).into_bytes();
1136            let begin_key: usize = rng.random_range(1..=end_key);
1137            let begin_key_bytes = key_from_num(begin_key).into_bytes();
1138            chaos_test_case(
1139                sst.clone(),
1140                Excluded(begin_key_bytes),
1141                Included(end_key_bytes),
1142                &truth,
1143                sstable_store.clone(),
1144                &sstable_info,
1145            )
1146            .await;
1147        }
1148    }
1149
1150    #[tokio::test]
1151    async fn test_min_epoch() {
1152        let sstable_store = mock_sstable_store().await;
1153        let (table0, sstable_info_0) = gen_iterator_test_sstable_with_incr_epoch(
1154            0,
1155            default_builder_opt_for_test(),
1156            |x| x * 3,
1157            sstable_store.clone(),
1158            TEST_KEYS_COUNT,
1159            1,
1160        )
1161        .await;
1162
1163        let backward_iters = vec![BackwardSstableIterator::new(
1164            table0,
1165            sstable_store,
1166            &sstable_info_0,
1167        )];
1168
1169        let min_count = (TEST_KEYS_COUNT / 5) as u64;
1170        let min_epoch = test_epoch(min_count);
1171        let mi = MergeIterator::new(backward_iters);
1172        let mut ui = BackwardUserIterator::with_min_epoch(mi, (Unbounded, Unbounded), min_epoch);
1173        ui.rewind().await.unwrap();
1174
1175        let mut i = 0;
1176        while ui.is_valid() {
1177            let key = ui.key();
1178            let key_epoch = key.epoch_with_gap.pure_epoch();
1179            assert!(key_epoch > min_epoch);
1180
1181            i += 1;
1182            ui.next().await.unwrap();
1183        }
1184
1185        let expect_count = TEST_KEYS_COUNT - min_count as usize;
1186        assert_eq!(i, expect_count);
1187    }
1188}