risingwave_storage/hummock/iterator/
forward_user.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound::*;
16
17use more_asserts::debug_assert_ge;
18use risingwave_common::must_match;
19use risingwave_common::util::epoch::MAX_SPILL_TIMES;
20use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange};
21use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
22
23use crate::hummock::HummockResult;
24use crate::hummock::iterator::{Forward, HummockIterator};
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::value::HummockValue;
27use crate::monitor::StoreLocalStatistic;
28
29/// [`UserIterator`] can be used by user directly.
30pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
31    /// Inner table iterator.
32    iterator: I,
33
34    // Track the last seen full key
35    full_key_tracker: FullKeyTracker<Vec<u8>, true>,
36
37    /// Start and end bounds of user key.
38    key_range: UserKeyRange,
39
40    /// Only reads values if `ts <= self.read_epoch`.
41    read_epoch: HummockEpoch,
42
43    /// Only reads values if `ts > self.min_epoch`. use for ttl
44    min_epoch: HummockEpoch,
45
46    /// Ensures the SSTs needed by `iterator` won't be vacuumed.
47    _version: Option<PinnedVersion>,
48
49    stats: StoreLocalStatistic,
50
51    /// Whether the iterator is pointing to a valid position
52    is_current_pos_valid: bool,
53}
54
55// TODO: decide whether this should also impl `HummockIterator`
56impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
57    /// Create [`UserIterator`] with given `read_epoch`.
58    pub(crate) fn new(
59        iterator: I,
60        key_range: UserKeyRange,
61        read_epoch: u64,
62        min_epoch: u64,
63        version: Option<PinnedVersion>,
64    ) -> Self {
65        Self {
66            iterator,
67            key_range,
68            read_epoch,
69            min_epoch,
70            stats: StoreLocalStatistic::default(),
71            _version: version,
72            is_current_pos_valid: false,
73            full_key_tracker: FullKeyTracker::new(FullKey::default()),
74        }
75    }
76
77    /// Create [`UserIterator`] with maximum epoch.
78    pub fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
79        let read_epoch = HummockEpoch::MAX;
80        Self::new(iterator, key_range, read_epoch, 0, None)
81    }
82
83    /// Gets the iterator move to the next step.
84    /// See `IteratorState` for the state machine details.
85    ///
86    /// Returned result:
87    /// - if `Ok(())` is returned, it means that the iterator successfully move to the next position
88    ///   (may reach to the end and thus not valid)
89    /// - if `Err(_) ` is returned, it means that some error happened.
90    pub async fn next(&mut self) -> HummockResult<()> {
91        // Reset the valid flag to make sure if error happens, `is_valid` should return false.
92        self.is_current_pos_valid = false;
93        // Move the iterator to the next step if it is currently potined to a ready entry.
94        self.iterator.next().await?;
95
96        // Check and move onto the next valid position if any
97        self.try_advance_to_next_valid().await
98    }
99
100    /// Returns the key with the newest version. Thus no version in it, and only the `user_key` will
101    /// be returned.
102    ///
103    /// The returned key is de-duplicated and thus it will not output the same key, unless the
104    /// `rewind` or `seek` methods are called.
105    ///
106    /// Note: before call the function you need to ensure that the iterator is valid.
107    pub fn key(&self) -> FullKey<&[u8]> {
108        assert!(self.is_valid());
109        self.full_key_tracker.latest_full_key.to_ref()
110    }
111
112    /// The returned value is in the form of user value.
113    ///
114    /// Note: before call the function you need to ensure that the iterator is valid.
115    pub fn value(&self) -> &[u8] {
116        assert!(self.is_valid());
117        must_match!(self.iterator.value(), HummockValue::Put(val) => val)
118    }
119
120    /// Resets the iterating position to the beginning.
121    pub async fn rewind(&mut self) -> HummockResult<()> {
122        // Reset
123        self.is_current_pos_valid = false;
124        self.full_key_tracker = FullKeyTracker::new(FullKey::default());
125
126        // Handle range scan
127        match &self.key_range.0 {
128            Included(begin_key) | Excluded(begin_key) => {
129                let full_key = FullKey {
130                    user_key: begin_key.as_ref(),
131                    epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
132                };
133                self.iterator.seek(full_key).await?;
134            }
135            Unbounded => {
136                self.iterator.rewind().await?;
137            }
138        };
139
140        self.try_advance_to_next_valid().await?;
141        if let Excluded(begin_key) = &self.key_range.0
142            && self.is_valid()
143            && self.key().user_key == begin_key.as_ref()
144        {
145            self.next().await?;
146        }
147        Ok(())
148    }
149
150    /// Resets the iterating position to the first position where the key >= provided key.
151    pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
152        // Reset
153        self.is_current_pos_valid = false;
154        self.full_key_tracker = FullKeyTracker::new(FullKey::default());
155
156        // Handle range scan when key < begin_key
157        let seek_key = match &self.key_range.0 {
158            Included(begin_key) | Excluded(begin_key) => {
159                let begin_key = begin_key.as_ref();
160                if begin_key > user_key {
161                    begin_key
162                } else {
163                    user_key
164                }
165            }
166            Unbounded => user_key,
167        };
168
169        let full_key = FullKey {
170            user_key: seek_key,
171            epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
172        };
173        self.iterator.seek(full_key).await?;
174
175        self.try_advance_to_next_valid().await?;
176        if let Excluded(begin_key) = &self.key_range.0
177            && self.is_valid()
178            && self.key().user_key == begin_key.as_ref()
179        {
180            debug_assert_ge!(begin_key.as_ref(), user_key);
181            self.next().await?;
182        }
183        Ok(())
184    }
185
186    /// Indicates whether the iterator can be used.
187    pub fn is_valid(&self) -> bool {
188        self.is_current_pos_valid
189    }
190
191    pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
192        stats.add(&self.stats);
193        self.iterator.collect_local_statistic(stats);
194    }
195
196    /// Advance the inner iterator to a valid position, in which the entry can be exposed.
197    /// Iterator will not be advanced if it already pointed to a valid position.
198    async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
199        loop {
200            if !self.iterator.is_valid() {
201                break;
202            }
203
204            let full_key = self.iterator.key();
205            let epoch = full_key.epoch_with_gap.pure_epoch();
206
207            // Handle epoch visibility
208            if epoch < self.min_epoch || epoch > self.read_epoch {
209                self.iterator.next().await?;
210                continue;
211            }
212
213            // Skip older version entry for the same user key
214            if !self.full_key_tracker.observe(full_key) {
215                self.stats.skip_multi_version_key_count += 1;
216                self.iterator.next().await?;
217                continue;
218            }
219
220            // A new user key is observed.
221
222            if self.user_key_out_of_range(full_key.user_key) {
223                break;
224            }
225
226            // Handle delete operation
227            match self.iterator.value() {
228                HummockValue::Put(_val) => {
229                    self.stats.processed_key_count += 1;
230                    self.is_current_pos_valid = true;
231                    return Ok(());
232                }
233                // It means that the key is deleted from the storage.
234                // Deleted kv and the previous versions (if any) of the key should not be
235                // returned to user.
236                HummockValue::Delete => {
237                    self.stats.skip_delete_key_count += 1;
238                }
239            }
240            self.iterator.next().await?;
241        }
242
243        self.is_current_pos_valid = false;
244        Ok(())
245    }
246
247    // Validate whether the current key is already out of range.
248    fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
249        // handle range scan
250        match &self.key_range.1 {
251            Included(end_key) => user_key > end_key.as_ref(),
252            Excluded(end_key) => user_key >= end_key.as_ref(),
253            Unbounded => false,
254        }
255    }
256}
257
258#[cfg(test)]
259impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
260    pub(crate) fn for_test_with_epoch(
261        iterator: I,
262        key_range: UserKeyRange,
263        read_epoch: u64,
264        min_epoch: u64,
265    ) -> Self {
266        Self::new(iterator, key_range, read_epoch, min_epoch, None)
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use std::ops::Bound::*;
273    use std::sync::Arc;
274
275    use bytes::Bytes;
276    use risingwave_common::util::epoch::test_epoch;
277    use risingwave_hummock_sdk::sstable_info::SstableInfo;
278
279    use super::*;
280    use crate::hummock::TableHolder;
281    use crate::hummock::iterator::MergeIterator;
282    use crate::hummock::iterator::test_utils::{
283        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_base,
284        gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
285        gen_iterator_test_sstable_with_range_tombstones, iterator_test_bytes_key_of,
286        iterator_test_bytes_key_of_epoch, iterator_test_bytes_user_key_of, iterator_test_value_of,
287        mock_sstable_store,
288    };
289    use crate::hummock::sstable::{
290        SstableIterator, SstableIteratorReadOptions, SstableIteratorType,
291    };
292    use crate::hummock::sstable_store::SstableStoreRef;
293
294    #[tokio::test]
295    async fn test_basic() {
296        let sstable_store = mock_sstable_store().await;
297        let read_options = Arc::new(SstableIteratorReadOptions::default());
298        let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
299            0,
300            default_builder_opt_for_test(),
301            |x| x * 3,
302            sstable_store.clone(),
303            TEST_KEYS_COUNT,
304        )
305        .await;
306        let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
307            1,
308            default_builder_opt_for_test(),
309            |x| x * 3 + 1,
310            sstable_store.clone(),
311            TEST_KEYS_COUNT,
312        )
313        .await;
314        let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
315            2,
316            default_builder_opt_for_test(),
317            |x| x * 3 + 2,
318            sstable_store.clone(),
319            TEST_KEYS_COUNT,
320        )
321        .await;
322        let iters = vec![
323            SstableIterator::create(
324                table0,
325                sstable_store.clone(),
326                read_options.clone(),
327                &sstable_info_0,
328            ),
329            SstableIterator::create(
330                table1,
331                sstable_store.clone(),
332                read_options.clone(),
333                &sstable_info_1,
334            ),
335            SstableIterator::create(table2, sstable_store, read_options.clone(), &sstable_info_2),
336        ];
337
338        let mi = MergeIterator::new(iters);
339        let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
340        ui.rewind().await.unwrap();
341
342        let mut i = 0;
343        while ui.is_valid() {
344            let key = ui.key();
345            let val = ui.value();
346            assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
347            assert_eq!(val, iterator_test_value_of(i).as_slice());
348            i += 1;
349            ui.next().await.unwrap();
350            if i == TEST_KEYS_COUNT * 3 {
351                assert!(!ui.is_valid());
352                break;
353            }
354        }
355        assert!(i >= TEST_KEYS_COUNT * 3);
356    }
357
358    #[tokio::test]
359    async fn test_seek() {
360        let sstable_store = mock_sstable_store().await;
361        let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
362            0,
363            default_builder_opt_for_test(),
364            |x| x * 3,
365            sstable_store.clone(),
366            TEST_KEYS_COUNT,
367        )
368        .await;
369        let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
370            1,
371            default_builder_opt_for_test(),
372            |x| x * 3 + 1,
373            sstable_store.clone(),
374            TEST_KEYS_COUNT,
375        )
376        .await;
377        let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
378            2,
379            default_builder_opt_for_test(),
380            |x| x * 3 + 2,
381            sstable_store.clone(),
382            TEST_KEYS_COUNT,
383        )
384        .await;
385        let read_options = Arc::new(SstableIteratorReadOptions::default());
386        let iters = vec![
387            SstableIterator::create(
388                table0,
389                sstable_store.clone(),
390                read_options.clone(),
391                &sstable_info_0,
392            ),
393            SstableIterator::create(
394                table1,
395                sstable_store.clone(),
396                read_options.clone(),
397                &sstable_info_1,
398            ),
399            SstableIterator::create(table2, sstable_store, read_options, &sstable_info_2),
400        ];
401
402        let mi = MergeIterator::new(iters);
403        let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
404
405        // right edge case
406        ui.seek(iterator_test_bytes_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
407            .await
408            .unwrap();
409        assert!(!ui.is_valid());
410
411        // normal case
412        ui.seek(iterator_test_bytes_user_key_of(TEST_KEYS_COUNT + 5).as_ref())
413            .await
414            .unwrap();
415        let k = ui.key();
416        let v = ui.value();
417        assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 5).as_slice());
418        assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 5).to_ref());
419        ui.seek(iterator_test_bytes_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
420            .await
421            .unwrap();
422        let k = ui.key();
423        let v = ui.value();
424        assert_eq!(
425            v,
426            iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
427        );
428        assert_eq!(
429            k,
430            iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
431        );
432
433        // left edge case
434        ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
435            .await
436            .unwrap();
437        let k = ui.key();
438        let v = ui.value();
439        assert_eq!(v, iterator_test_value_of(0).as_slice());
440        assert_eq!(k, iterator_test_bytes_key_of(0).to_ref());
441    }
442
443    #[tokio::test]
444    async fn test_delete() {
445        let sstable_store = mock_sstable_store().await;
446
447        // key=[idx, epoch], value
448        let kv_pairs = vec![
449            (1, 100, HummockValue::put(iterator_test_value_of(1))),
450            (2, 300, HummockValue::delete()),
451        ];
452        let (table0, sstable_info_0) =
453            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
454
455        let kv_pairs = vec![
456            (1, 200, HummockValue::delete()),
457            (2, 400, HummockValue::put(iterator_test_value_of(2))),
458        ];
459        let (table1, sstable_info_1) =
460            gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
461
462        let read_options = Arc::new(SstableIteratorReadOptions::default());
463        let iters = vec![
464            SstableIterator::create(
465                table0,
466                sstable_store.clone(),
467                read_options.clone(),
468                &sstable_info_0,
469            ),
470            SstableIterator::create(table1, sstable_store.clone(), read_options, &sstable_info_1),
471        ];
472
473        let mi = MergeIterator::new(iters);
474        let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
475        ui.rewind().await.unwrap();
476
477        // verify
478        let k = ui.key();
479        let v = ui.value();
480        assert_eq!(k, iterator_test_bytes_key_of_epoch(2, 400).to_ref());
481        assert_eq!(v, &Bytes::from(iterator_test_value_of(2)));
482
483        // only one valid kv pair
484        ui.next().await.unwrap();
485        assert!(!ui.is_valid());
486    }
487
488    async fn generate_test_data(sstable_store: SstableStoreRef) -> (TableHolder, SstableInfo) {
489        let kv_pairs = vec![
490            (0, 200, HummockValue::delete()),
491            (0, 100, HummockValue::put(iterator_test_value_of(0))),
492            (1, 200, HummockValue::put(iterator_test_value_of(1))),
493            (1, 100, HummockValue::delete()),
494            (2, 300, HummockValue::put(iterator_test_value_of(2))),
495            (2, 200, HummockValue::delete()),
496            (2, 100, HummockValue::delete()),
497            (3, 100, HummockValue::put(iterator_test_value_of(3))),
498            (5, 200, HummockValue::delete()),
499            (5, 100, HummockValue::put(iterator_test_value_of(5))),
500            (6, 100, HummockValue::put(iterator_test_value_of(6))),
501            (7, 200, HummockValue::delete()),
502            (7, 100, HummockValue::put(iterator_test_value_of(7))),
503            (8, 100, HummockValue::put(iterator_test_value_of(8))),
504        ];
505        let sst_info =
506            gen_iterator_test_sstable_with_range_tombstones(0, kv_pairs, sstable_store.clone())
507                .await;
508        (
509            sstable_store
510                .sstable(&sst_info, &mut StoreLocalStatistic::default())
511                .await
512                .unwrap(),
513            sst_info,
514        )
515    }
516
517    // left..=end
518    #[tokio::test]
519    async fn test_range_inclusive() {
520        let sstable_store = mock_sstable_store().await;
521        // key=[idx, epoch], value
522        let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
523        let read_options = Arc::new(SstableIteratorReadOptions::default());
524        let iters = vec![SstableIterator::create(
525            table,
526            sstable_store,
527            read_options,
528            &sstable_info,
529        )];
530        let mi = MergeIterator::new(iters);
531
532        let begin_key = Included(iterator_test_bytes_user_key_of(2));
533        let end_key = Included(iterator_test_bytes_user_key_of(7));
534
535        let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
536
537        // ----- basic iterate -----
538        ui.rewind().await.unwrap();
539        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
540        ui.next().await.unwrap();
541        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
542        ui.next().await.unwrap();
543        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
544        ui.next().await.unwrap();
545        assert!(!ui.is_valid());
546
547        // ----- before-begin-range iterate -----
548        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
549            .await
550            .unwrap();
551        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
552        ui.next().await.unwrap();
553        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
554        ui.next().await.unwrap();
555        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
556        ui.next().await.unwrap();
557        assert!(!ui.is_valid());
558
559        // ----- begin-range iterate -----
560        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
561            .await
562            .unwrap();
563        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
564        ui.next().await.unwrap();
565        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
566        ui.next().await.unwrap();
567        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
568        ui.next().await.unwrap();
569        assert!(!ui.is_valid());
570
571        // ----- end-range iterate -----
572        ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
573            .await
574            .unwrap();
575        assert!(!ui.is_valid());
576
577        // ----- after-end-range iterate -----
578        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
579            .await
580            .unwrap();
581        assert!(!ui.is_valid());
582    }
583
584    // left..end
585    #[tokio::test]
586    async fn test_range() {
587        let sstable_store = mock_sstable_store().await;
588        // key=[idx, epoch], value
589        let kv_pairs = vec![
590            (0, 200, HummockValue::delete()),
591            (0, 100, HummockValue::put(iterator_test_value_of(0))),
592            (1, 200, HummockValue::put(iterator_test_value_of(1))),
593            (1, 100, HummockValue::delete()),
594            (2, 300, HummockValue::put(iterator_test_value_of(2))),
595            (2, 200, HummockValue::delete()),
596            (2, 100, HummockValue::delete()),
597            (3, 100, HummockValue::put(iterator_test_value_of(3))),
598            (5, 200, HummockValue::delete()),
599            (5, 100, HummockValue::put(iterator_test_value_of(5))),
600            (6, 100, HummockValue::put(iterator_test_value_of(6))),
601            (7, 100, HummockValue::put(iterator_test_value_of(7))),
602            (8, 100, HummockValue::put(iterator_test_value_of(8))),
603        ];
604        let (table, sstable_info) =
605            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
606        let read_options = Arc::new(SstableIteratorReadOptions::default());
607        let iters = vec![SstableIterator::create(
608            table.clone(),
609            sstable_store.clone(),
610            read_options.clone(),
611            &sstable_info,
612        )];
613        let mi = MergeIterator::new(iters);
614
615        let begin_key = Included(iterator_test_bytes_user_key_of(2));
616        let end_key = Excluded(iterator_test_bytes_user_key_of(7));
617
618        let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
619
620        // ----- basic iterate -----
621        ui.rewind().await.unwrap();
622        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
623        ui.next().await.unwrap();
624        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
625        ui.next().await.unwrap();
626        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
627        ui.next().await.unwrap();
628        assert!(!ui.is_valid());
629
630        // ----- before-begin-range iterate -----
631        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
632            .await
633            .unwrap();
634        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
635        ui.next().await.unwrap();
636        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
637        ui.next().await.unwrap();
638        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
639        ui.next().await.unwrap();
640        assert!(!ui.is_valid());
641
642        // ----- begin-range iterate -----
643        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
644            .await
645            .unwrap();
646        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
647        ui.next().await.unwrap();
648        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
649        ui.next().await.unwrap();
650        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
651        ui.next().await.unwrap();
652        assert!(!ui.is_valid());
653
654        // ----- end-range iterate -----
655        ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
656            .await
657            .unwrap();
658        assert!(!ui.is_valid());
659
660        // ----- after-end-range iterate -----
661        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
662            .await
663            .unwrap();
664        assert!(!ui.is_valid());
665
666        let iters = vec![SstableIterator::create(
667            table,
668            sstable_store,
669            read_options,
670            &sstable_info,
671        )];
672        let mi = MergeIterator::new(iters);
673
674        let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
675        let end_key = Excluded(iterator_test_bytes_user_key_of(7));
676
677        let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
678        // ----- after-end-range iterate -----
679        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
680            .await
681            .unwrap();
682        assert!(ui.is_valid());
683        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
684        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
685            .await
686            .unwrap();
687        assert!(ui.is_valid());
688        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
689        ui.seek(iterator_test_bytes_user_key_of(3).as_ref())
690            .await
691            .unwrap();
692        assert!(ui.is_valid());
693        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
694        ui.seek(iterator_test_bytes_user_key_of(4).as_ref())
695            .await
696            .unwrap();
697        assert!(ui.is_valid());
698        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
699    }
700
701    // ..=right
702    #[tokio::test]
703    async fn test_range_to_inclusive() {
704        let sstable_store = mock_sstable_store().await;
705        // key=[idx, epoch], value
706
707        let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
708        let read_options = Arc::new(SstableIteratorReadOptions::default());
709        let iters = vec![SstableIterator::create(
710            table,
711            sstable_store,
712            read_options,
713            &sstable_info,
714        )];
715        let mi = MergeIterator::new(iters);
716        let end_key = Included(iterator_test_bytes_user_key_of(7));
717
718        let mut ui = UserIterator::for_test(mi, (Unbounded, end_key));
719
720        // ----- basic iterate -----
721        ui.rewind().await.unwrap();
722        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
723        ui.next().await.unwrap();
724        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
725        ui.next().await.unwrap();
726        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
727        ui.next().await.unwrap();
728        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
729        ui.next().await.unwrap();
730        assert!(!ui.is_valid());
731
732        // ----- begin-range iterate -----
733        ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
734            .await
735            .unwrap();
736        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
737        ui.next().await.unwrap();
738        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
739        ui.next().await.unwrap();
740        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
741        ui.next().await.unwrap();
742        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
743        ui.next().await.unwrap();
744        assert!(!ui.is_valid());
745
746        // ----- in-range iterate -----
747        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
748            .await
749            .unwrap();
750        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
751        ui.next().await.unwrap();
752        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
753        ui.next().await.unwrap();
754        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
755        ui.next().await.unwrap();
756        assert!(!ui.is_valid());
757
758        // ----- end-range iterate -----
759        ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
760            .await
761            .unwrap();
762        assert!(!ui.is_valid());
763
764        // ----- after-end-range iterate -----
765        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
766            .await
767            .unwrap();
768        assert!(!ui.is_valid());
769    }
770
771    // left..
772    #[tokio::test]
773    async fn test_range_from() {
774        let sstable_store = mock_sstable_store().await;
775        // key=[idx, epoch], value
776        let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
777        let read_options = Arc::new(SstableIteratorReadOptions::default());
778        let iters = vec![SstableIterator::create(
779            table,
780            sstable_store,
781            read_options,
782            &sstable_info,
783        )];
784        let mi = MergeIterator::new(iters);
785        let begin_key = Included(iterator_test_bytes_user_key_of(2));
786
787        let mut ui = UserIterator::for_test(mi, (begin_key, Unbounded));
788
789        // ----- basic iterate -----
790        ui.rewind().await.unwrap();
791        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
792        ui.next().await.unwrap();
793        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
794        ui.next().await.unwrap();
795        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
796        ui.next().await.unwrap();
797        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
798        ui.next().await.unwrap();
799        assert!(!ui.is_valid());
800
801        // ----- begin-range iterate -----
802        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
803            .await
804            .unwrap();
805        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
806        ui.next().await.unwrap();
807        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
808        ui.next().await.unwrap();
809        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
810        ui.next().await.unwrap();
811        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
812        ui.next().await.unwrap();
813        assert!(!ui.is_valid());
814
815        // ----- in-range iterate -----
816        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
817            .await
818            .unwrap();
819        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
820        ui.next().await.unwrap();
821        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
822        ui.next().await.unwrap();
823        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
824        ui.next().await.unwrap();
825        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
826        ui.next().await.unwrap();
827        assert!(!ui.is_valid());
828
829        // ----- end-range iterate -----
830        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
831            .await
832            .unwrap();
833        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
834        ui.next().await.unwrap();
835        assert!(!ui.is_valid());
836
837        // ----- after-end-range iterate -----
838        ui.seek(iterator_test_bytes_user_key_of(9).as_ref())
839            .await
840            .unwrap();
841        assert!(!ui.is_valid());
842    }
843
844    #[tokio::test]
845    async fn test_min_epoch() {
846        let sstable_store = mock_sstable_store().await;
847        let read_options = Arc::new(SstableIteratorReadOptions::default());
848        let (table0, sstable_info_0) = gen_iterator_test_sstable_with_incr_epoch(
849            0,
850            default_builder_opt_for_test(),
851            |x| x * 3,
852            sstable_store.clone(),
853            TEST_KEYS_COUNT,
854            1,
855        )
856        .await;
857        let iters = vec![SstableIterator::create(
858            table0,
859            sstable_store.clone(),
860            read_options.clone(),
861            &sstable_info_0,
862        )];
863
864        let min_count = (TEST_KEYS_COUNT / 5) as u64;
865        let min_epoch = test_epoch(min_count);
866        let mi = MergeIterator::new(iters);
867        let mut ui =
868            UserIterator::for_test_with_epoch(mi, (Unbounded, Unbounded), u64::MAX, min_epoch);
869        ui.rewind().await.unwrap();
870
871        let mut i = 0;
872        while ui.is_valid() {
873            let key = ui.key();
874            let key_epoch = key.epoch_with_gap.pure_epoch();
875            assert!(key_epoch >= min_epoch);
876
877            i += 1;
878            ui.next().await.unwrap();
879        }
880
881        let expect_count = TEST_KEYS_COUNT - (min_epoch / test_epoch(1)) as usize + 1;
882        assert_eq!(i, expect_count);
883    }
884}