Skip to main content

risingwave_storage/hummock/iterator/
forward_user.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound::*;
16
17use more_asserts::debug_assert_ge;
18use risingwave_common::must_match;
19use risingwave_common::util::epoch::MAX_SPILL_TIMES;
20use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange};
21use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
22
23use crate::hummock::HummockResult;
24use crate::hummock::iterator::{Forward, HummockIterator};
25use crate::hummock::local_version::pinned_version::PinnedVersion;
26use crate::hummock::value::HummockValue;
27use crate::monitor::StoreLocalStatistic;
28
29/// [`UserIterator`] can be used by user directly.
30pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
31    /// Inner table iterator.
32    iterator: I,
33
34    // Track the last seen full key
35    full_key_tracker: FullKeyTracker<Vec<u8>, true>,
36
37    /// Start and end bounds of user key.
38    key_range: UserKeyRange,
39
40    /// Only reads values if `ts <= self.read_epoch`.
41    read_epoch: HummockEpoch,
42
43    /// Only reads values if `ts > self.min_epoch`. use for ttl
44    min_epoch: HummockEpoch,
45
46    /// Ensures the SSTs needed by `iterator` won't be vacuumed.
47    _version: Option<PinnedVersion>,
48
49    stats: StoreLocalStatistic,
50
51    /// Whether the iterator is pointing to a valid position
52    is_current_pos_valid: bool,
53}
54
55// TODO: decide whether this should also impl `HummockIterator`
56impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
57    /// Create [`UserIterator`] with given `read_epoch`.
58    pub(crate) fn new(
59        iterator: I,
60        key_range: UserKeyRange,
61        read_epoch: u64,
62        min_epoch: u64,
63        version: Option<PinnedVersion>,
64    ) -> Self {
65        Self {
66            iterator,
67            key_range,
68            read_epoch,
69            min_epoch,
70            stats: StoreLocalStatistic::default(),
71            _version: version,
72            is_current_pos_valid: false,
73            full_key_tracker: FullKeyTracker::new(FullKey::default()),
74        }
75    }
76
77    /// Create [`UserIterator`] with maximum epoch.
78    pub fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
79        let read_epoch = HummockEpoch::MAX;
80        Self::new(iterator, key_range, read_epoch, 0, None)
81    }
82
83    /// Gets the iterator move to the next step.
84    /// See `IteratorState` for the state machine details.
85    ///
86    /// Returned result:
87    /// - if `Ok(())` is returned, it means that the iterator successfully move to the next position
88    ///   (may reach to the end and thus not valid)
89    /// - if `Err(_) ` is returned, it means that some error happened.
90    pub async fn next(&mut self) -> HummockResult<()> {
91        // Reset the valid flag to make sure if error happens, `is_valid` should return false.
92        self.is_current_pos_valid = false;
93        // Move the iterator to the next step if it is currently potined to a ready entry.
94        self.iterator.next().await?;
95
96        // Check and move onto the next valid position if any
97        self.try_advance_to_next_valid().await
98    }
99
100    /// Returns the key with the newest version. Thus no version in it, and only the `user_key` will
101    /// be returned.
102    ///
103    /// The returned key is de-duplicated and thus it will not output the same key, unless the
104    /// `rewind` or `seek` methods are called.
105    ///
106    /// Note: before call the function you need to ensure that the iterator is valid.
107    pub fn key(&self) -> FullKey<&[u8]> {
108        assert!(self.is_valid());
109        self.full_key_tracker.latest_full_key.to_ref()
110    }
111
112    /// The returned value is in the form of user value.
113    ///
114    /// Note: before call the function you need to ensure that the iterator is valid.
115    pub fn value(&self) -> &[u8] {
116        assert!(self.is_valid());
117        must_match!(self.iterator.value(), HummockValue::Put(val) => val)
118    }
119
120    /// Resets the iterating position to the beginning.
121    pub async fn rewind(&mut self) -> HummockResult<()> {
122        // Reset
123        self.is_current_pos_valid = false;
124        self.full_key_tracker = FullKeyTracker::new(FullKey::default());
125
126        // Handle range scan
127        match &self.key_range.0 {
128            Included(begin_key) | Excluded(begin_key) => {
129                let full_key = FullKey {
130                    user_key: begin_key.as_ref(),
131                    epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
132                };
133                self.iterator.seek(full_key).await?;
134            }
135            Unbounded => {
136                self.iterator.rewind().await?;
137            }
138        };
139
140        self.try_advance_to_next_valid().await?;
141        if let Excluded(begin_key) = &self.key_range.0
142            && self.is_valid()
143            && self.key().user_key == begin_key.as_ref()
144        {
145            self.next().await?;
146        }
147        Ok(())
148    }
149
150    /// Resets the iterating position to the first position where the key >= provided key.
151    pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
152        // Reset
153        self.is_current_pos_valid = false;
154        self.full_key_tracker = FullKeyTracker::new(FullKey::default());
155
156        // Handle range scan when key < begin_key
157        let seek_key = match &self.key_range.0 {
158            Included(begin_key) | Excluded(begin_key) => {
159                let begin_key = begin_key.as_ref();
160                if begin_key > user_key {
161                    begin_key
162                } else {
163                    user_key
164                }
165            }
166            Unbounded => user_key,
167        };
168
169        let full_key = FullKey {
170            user_key: seek_key,
171            epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
172        };
173        self.iterator.seek(full_key).await?;
174
175        self.try_advance_to_next_valid().await?;
176        if let Excluded(begin_key) = &self.key_range.0
177            && self.is_valid()
178            && self.key().user_key == begin_key.as_ref()
179        {
180            debug_assert_ge!(begin_key.as_ref(), user_key);
181            self.next().await?;
182        }
183        Ok(())
184    }
185
186    /// Indicates whether the iterator can be used.
187    pub fn is_valid(&self) -> bool {
188        self.is_current_pos_valid
189    }
190
191    pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
192        stats.add(&self.stats);
193        self.iterator.collect_local_statistic(stats);
194    }
195
196    /// Advance the inner iterator to a valid position, in which the entry can be exposed.
197    /// Iterator will not be advanced if it already pointed to a valid position.
198    async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
199        loop {
200            if !self.iterator.is_valid() {
201                break;
202            }
203
204            let full_key = self.iterator.key();
205            let epoch = full_key.epoch_with_gap.pure_epoch();
206
207            // Handle epoch visibility
208            if epoch < self.min_epoch || epoch > self.read_epoch {
209                self.iterator.next().await?;
210                continue;
211            }
212
213            // Skip older version entry for the same user key
214            if !self.full_key_tracker.observe(full_key) {
215                self.stats.skip_multi_version_key_count += 1;
216                self.iterator.next().await?;
217                continue;
218            }
219
220            // A new user key is observed.
221
222            if self.user_key_out_of_range(full_key.user_key) {
223                break;
224            }
225
226            // Handle delete operation
227            match self.iterator.value() {
228                HummockValue::Put(_val) => {
229                    self.stats.processed_key_count += 1;
230                    self.is_current_pos_valid = true;
231                    return Ok(());
232                }
233                // It means that the key is deleted from the storage.
234                // Deleted kv and the previous versions (if any) of the key should not be
235                // returned to user.
236                HummockValue::Delete => {
237                    self.stats.skip_delete_key_count += 1;
238                }
239            }
240            self.iterator.next().await?;
241        }
242
243        self.is_current_pos_valid = false;
244        Ok(())
245    }
246
247    // Validate whether the current key is already out of range.
248    fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
249        // handle range scan
250        match &self.key_range.1 {
251            Included(end_key) => user_key > end_key.as_ref(),
252            Excluded(end_key) => user_key >= end_key.as_ref(),
253            Unbounded => false,
254        }
255    }
256}
257
258#[cfg(test)]
259impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
260    pub(crate) fn for_test_with_epoch(
261        iterator: I,
262        key_range: UserKeyRange,
263        read_epoch: u64,
264        min_epoch: u64,
265    ) -> Self {
266        Self::new(iterator, key_range, read_epoch, min_epoch, None)
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use std::sync::Arc;
273
274    use bytes::Bytes;
275    use risingwave_common::util::epoch::test_epoch;
276    use risingwave_hummock_sdk::sstable_info::SstableInfo;
277
278    use super::*;
279    use crate::hummock::TableHolder;
280    use crate::hummock::iterator::MergeIterator;
281    use crate::hummock::iterator::test_utils::{
282        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_iterator_test_sstable_base,
283        gen_iterator_test_sstable_from_kv_pair, gen_iterator_test_sstable_with_incr_epoch,
284        gen_iterator_test_sstable_with_range_tombstones, iterator_test_bytes_key_of,
285        iterator_test_bytes_key_of_epoch, iterator_test_bytes_user_key_of, iterator_test_value_of,
286        mock_sstable_store,
287    };
288    use crate::hummock::sstable::{
289        SstableIterator, SstableIteratorReadOptions, SstableIteratorType,
290    };
291    use crate::hummock::sstable_store::SstableStoreRef;
292
293    #[tokio::test]
294    async fn test_basic() {
295        let sstable_store = mock_sstable_store().await;
296        let read_options = Arc::new(SstableIteratorReadOptions::default());
297        let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
298            0,
299            default_builder_opt_for_test(),
300            |x| x * 3,
301            sstable_store.clone(),
302            TEST_KEYS_COUNT,
303        )
304        .await;
305        let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
306            1,
307            default_builder_opt_for_test(),
308            |x| x * 3 + 1,
309            sstable_store.clone(),
310            TEST_KEYS_COUNT,
311        )
312        .await;
313        let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
314            2,
315            default_builder_opt_for_test(),
316            |x| x * 3 + 2,
317            sstable_store.clone(),
318            TEST_KEYS_COUNT,
319        )
320        .await;
321        let iters = vec![
322            SstableIterator::create(
323                table0,
324                sstable_store.clone(),
325                read_options.clone(),
326                &sstable_info_0,
327            ),
328            SstableIterator::create(
329                table1,
330                sstable_store.clone(),
331                read_options.clone(),
332                &sstable_info_1,
333            ),
334            SstableIterator::create(table2, sstable_store, read_options.clone(), &sstable_info_2),
335        ];
336
337        let mi = MergeIterator::new(iters);
338        let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
339        ui.rewind().await.unwrap();
340
341        let mut i = 0;
342        while ui.is_valid() {
343            let key = ui.key();
344            let val = ui.value();
345            assert_eq!(key, iterator_test_bytes_key_of(i).to_ref());
346            assert_eq!(val, iterator_test_value_of(i).as_slice());
347            i += 1;
348            ui.next().await.unwrap();
349            if i == TEST_KEYS_COUNT * 3 {
350                assert!(!ui.is_valid());
351                break;
352            }
353        }
354        assert!(i >= TEST_KEYS_COUNT * 3);
355    }
356
357    #[tokio::test]
358    async fn test_seek() {
359        let sstable_store = mock_sstable_store().await;
360        let (table0, sstable_info_0) = gen_iterator_test_sstable_base(
361            0,
362            default_builder_opt_for_test(),
363            |x| x * 3,
364            sstable_store.clone(),
365            TEST_KEYS_COUNT,
366        )
367        .await;
368        let (table1, sstable_info_1) = gen_iterator_test_sstable_base(
369            1,
370            default_builder_opt_for_test(),
371            |x| x * 3 + 1,
372            sstable_store.clone(),
373            TEST_KEYS_COUNT,
374        )
375        .await;
376        let (table2, sstable_info_2) = gen_iterator_test_sstable_base(
377            2,
378            default_builder_opt_for_test(),
379            |x| x * 3 + 2,
380            sstable_store.clone(),
381            TEST_KEYS_COUNT,
382        )
383        .await;
384        let read_options = Arc::new(SstableIteratorReadOptions::default());
385        let iters = vec![
386            SstableIterator::create(
387                table0,
388                sstable_store.clone(),
389                read_options.clone(),
390                &sstable_info_0,
391            ),
392            SstableIterator::create(
393                table1,
394                sstable_store.clone(),
395                read_options.clone(),
396                &sstable_info_1,
397            ),
398            SstableIterator::create(table2, sstable_store, read_options, &sstable_info_2),
399        ];
400
401        let mi = MergeIterator::new(iters);
402        let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
403
404        // right edge case
405        ui.seek(iterator_test_bytes_user_key_of(3 * TEST_KEYS_COUNT).as_ref())
406            .await
407            .unwrap();
408        assert!(!ui.is_valid());
409
410        // normal case
411        ui.seek(iterator_test_bytes_user_key_of(TEST_KEYS_COUNT + 5).as_ref())
412            .await
413            .unwrap();
414        let k = ui.key();
415        let v = ui.value();
416        assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 5).as_slice());
417        assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 5).to_ref());
418        ui.seek(iterator_test_bytes_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref())
419            .await
420            .unwrap();
421        let k = ui.key();
422        let v = ui.value();
423        assert_eq!(
424            v,
425            iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice()
426        );
427        assert_eq!(
428            k,
429            iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref()
430        );
431
432        // left edge case
433        ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
434            .await
435            .unwrap();
436        let k = ui.key();
437        let v = ui.value();
438        assert_eq!(v, iterator_test_value_of(0).as_slice());
439        assert_eq!(k, iterator_test_bytes_key_of(0).to_ref());
440    }
441
442    #[tokio::test]
443    async fn test_delete() {
444        let sstable_store = mock_sstable_store().await;
445
446        // key=[idx, epoch], value
447        let kv_pairs = vec![
448            (1, 100, HummockValue::put(iterator_test_value_of(1))),
449            (2, 300, HummockValue::delete()),
450        ];
451        let (table0, sstable_info_0) =
452            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
453
454        let kv_pairs = vec![
455            (1, 200, HummockValue::delete()),
456            (2, 400, HummockValue::put(iterator_test_value_of(2))),
457        ];
458        let (table1, sstable_info_1) =
459            gen_iterator_test_sstable_from_kv_pair(1, kv_pairs, sstable_store.clone()).await;
460
461        let read_options = Arc::new(SstableIteratorReadOptions::default());
462        let iters = vec![
463            SstableIterator::create(
464                table0,
465                sstable_store.clone(),
466                read_options.clone(),
467                &sstable_info_0,
468            ),
469            SstableIterator::create(table1, sstable_store.clone(), read_options, &sstable_info_1),
470        ];
471
472        let mi = MergeIterator::new(iters);
473        let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded));
474        ui.rewind().await.unwrap();
475
476        // verify
477        let k = ui.key();
478        let v = ui.value();
479        assert_eq!(k, iterator_test_bytes_key_of_epoch(2, 400).to_ref());
480        assert_eq!(v, &Bytes::from(iterator_test_value_of(2)));
481
482        // only one valid kv pair
483        ui.next().await.unwrap();
484        assert!(!ui.is_valid());
485    }
486
487    async fn generate_test_data(sstable_store: SstableStoreRef) -> (TableHolder, SstableInfo) {
488        let kv_pairs = vec![
489            (0, 200, HummockValue::delete()),
490            (0, 100, HummockValue::put(iterator_test_value_of(0))),
491            (1, 200, HummockValue::put(iterator_test_value_of(1))),
492            (1, 100, HummockValue::delete()),
493            (2, 300, HummockValue::put(iterator_test_value_of(2))),
494            (2, 200, HummockValue::delete()),
495            (2, 100, HummockValue::delete()),
496            (3, 100, HummockValue::put(iterator_test_value_of(3))),
497            (5, 200, HummockValue::delete()),
498            (5, 100, HummockValue::put(iterator_test_value_of(5))),
499            (6, 100, HummockValue::put(iterator_test_value_of(6))),
500            (7, 200, HummockValue::delete()),
501            (7, 100, HummockValue::put(iterator_test_value_of(7))),
502            (8, 100, HummockValue::put(iterator_test_value_of(8))),
503        ];
504        let sst_info =
505            gen_iterator_test_sstable_with_range_tombstones(0, kv_pairs, sstable_store.clone())
506                .await;
507        (
508            sstable_store
509                .sstable(&sst_info, &mut StoreLocalStatistic::default())
510                .await
511                .unwrap(),
512            sst_info,
513        )
514    }
515
516    // left..=end
517    #[tokio::test]
518    async fn test_range_inclusive() {
519        let sstable_store = mock_sstable_store().await;
520        // key=[idx, epoch], value
521        let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
522        let read_options = Arc::new(SstableIteratorReadOptions::default());
523        let iters = vec![SstableIterator::create(
524            table,
525            sstable_store,
526            read_options,
527            &sstable_info,
528        )];
529        let mi = MergeIterator::new(iters);
530
531        let begin_key = Included(iterator_test_bytes_user_key_of(2));
532        let end_key = Included(iterator_test_bytes_user_key_of(7));
533
534        let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
535
536        // ----- basic iterate -----
537        ui.rewind().await.unwrap();
538        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
539        ui.next().await.unwrap();
540        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
541        ui.next().await.unwrap();
542        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
543        ui.next().await.unwrap();
544        assert!(!ui.is_valid());
545
546        // ----- before-begin-range iterate -----
547        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
548            .await
549            .unwrap();
550        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
551        ui.next().await.unwrap();
552        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
553        ui.next().await.unwrap();
554        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
555        ui.next().await.unwrap();
556        assert!(!ui.is_valid());
557
558        // ----- begin-range iterate -----
559        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
560            .await
561            .unwrap();
562        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
563        ui.next().await.unwrap();
564        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
565        ui.next().await.unwrap();
566        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
567        ui.next().await.unwrap();
568        assert!(!ui.is_valid());
569
570        // ----- end-range iterate -----
571        ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
572            .await
573            .unwrap();
574        assert!(!ui.is_valid());
575
576        // ----- after-end-range iterate -----
577        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
578            .await
579            .unwrap();
580        assert!(!ui.is_valid());
581    }
582
583    // left..end
584    #[tokio::test]
585    async fn test_range() {
586        let sstable_store = mock_sstable_store().await;
587        // key=[idx, epoch], value
588        let kv_pairs = vec![
589            (0, 200, HummockValue::delete()),
590            (0, 100, HummockValue::put(iterator_test_value_of(0))),
591            (1, 200, HummockValue::put(iterator_test_value_of(1))),
592            (1, 100, HummockValue::delete()),
593            (2, 300, HummockValue::put(iterator_test_value_of(2))),
594            (2, 200, HummockValue::delete()),
595            (2, 100, HummockValue::delete()),
596            (3, 100, HummockValue::put(iterator_test_value_of(3))),
597            (5, 200, HummockValue::delete()),
598            (5, 100, HummockValue::put(iterator_test_value_of(5))),
599            (6, 100, HummockValue::put(iterator_test_value_of(6))),
600            (7, 100, HummockValue::put(iterator_test_value_of(7))),
601            (8, 100, HummockValue::put(iterator_test_value_of(8))),
602        ];
603        let (table, sstable_info) =
604            gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await;
605        let read_options = Arc::new(SstableIteratorReadOptions::default());
606        let iters = vec![SstableIterator::create(
607            table.clone(),
608            sstable_store.clone(),
609            read_options.clone(),
610            &sstable_info,
611        )];
612        let mi = MergeIterator::new(iters);
613
614        let begin_key = Included(iterator_test_bytes_user_key_of(2));
615        let end_key = Excluded(iterator_test_bytes_user_key_of(7));
616
617        let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
618
619        // ----- basic iterate -----
620        ui.rewind().await.unwrap();
621        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
622        ui.next().await.unwrap();
623        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
624        ui.next().await.unwrap();
625        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
626        ui.next().await.unwrap();
627        assert!(!ui.is_valid());
628
629        // ----- before-begin-range iterate -----
630        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
631            .await
632            .unwrap();
633        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
634        ui.next().await.unwrap();
635        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
636        ui.next().await.unwrap();
637        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
638        ui.next().await.unwrap();
639        assert!(!ui.is_valid());
640
641        // ----- begin-range iterate -----
642        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
643            .await
644            .unwrap();
645        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
646        ui.next().await.unwrap();
647        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
648        ui.next().await.unwrap();
649        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
650        ui.next().await.unwrap();
651        assert!(!ui.is_valid());
652
653        // ----- end-range iterate -----
654        ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
655            .await
656            .unwrap();
657        assert!(!ui.is_valid());
658
659        // ----- after-end-range iterate -----
660        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
661            .await
662            .unwrap();
663        assert!(!ui.is_valid());
664
665        let iters = vec![SstableIterator::create(
666            table,
667            sstable_store,
668            read_options,
669            &sstable_info,
670        )];
671        let mi = MergeIterator::new(iters);
672
673        let begin_key = Excluded(iterator_test_bytes_user_key_of(2));
674        let end_key = Excluded(iterator_test_bytes_user_key_of(7));
675
676        let mut ui = UserIterator::for_test(mi, (begin_key, end_key));
677        // ----- after-end-range iterate -----
678        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
679            .await
680            .unwrap();
681        assert!(ui.is_valid());
682        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
683        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
684            .await
685            .unwrap();
686        assert!(ui.is_valid());
687        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
688        ui.seek(iterator_test_bytes_user_key_of(3).as_ref())
689            .await
690            .unwrap();
691        assert!(ui.is_valid());
692        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
693        ui.seek(iterator_test_bytes_user_key_of(4).as_ref())
694            .await
695            .unwrap();
696        assert!(ui.is_valid());
697        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
698    }
699
700    // ..=right
701    #[tokio::test]
702    async fn test_range_to_inclusive() {
703        let sstable_store = mock_sstable_store().await;
704        // key=[idx, epoch], value
705
706        let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
707        let read_options = Arc::new(SstableIteratorReadOptions::default());
708        let iters = vec![SstableIterator::create(
709            table,
710            sstable_store,
711            read_options,
712            &sstable_info,
713        )];
714        let mi = MergeIterator::new(iters);
715        let end_key = Included(iterator_test_bytes_user_key_of(7));
716
717        let mut ui = UserIterator::for_test(mi, (Unbounded, end_key));
718
719        // ----- basic iterate -----
720        ui.rewind().await.unwrap();
721        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
722        ui.next().await.unwrap();
723        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
724        ui.next().await.unwrap();
725        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
726        ui.next().await.unwrap();
727        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
728        ui.next().await.unwrap();
729        assert!(!ui.is_valid());
730
731        // ----- begin-range iterate -----
732        ui.seek(iterator_test_bytes_user_key_of(0).as_ref())
733            .await
734            .unwrap();
735        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref());
736        ui.next().await.unwrap();
737        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
738        ui.next().await.unwrap();
739        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
740        ui.next().await.unwrap();
741        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
742        ui.next().await.unwrap();
743        assert!(!ui.is_valid());
744
745        // ----- in-range iterate -----
746        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
747            .await
748            .unwrap();
749        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
750        ui.next().await.unwrap();
751        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
752        ui.next().await.unwrap();
753        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
754        ui.next().await.unwrap();
755        assert!(!ui.is_valid());
756
757        // ----- end-range iterate -----
758        ui.seek(iterator_test_bytes_user_key_of(7).as_ref())
759            .await
760            .unwrap();
761        assert!(!ui.is_valid());
762
763        // ----- after-end-range iterate -----
764        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
765            .await
766            .unwrap();
767        assert!(!ui.is_valid());
768    }
769
770    // left..
771    #[tokio::test]
772    async fn test_range_from() {
773        let sstable_store = mock_sstable_store().await;
774        // key=[idx, epoch], value
775        let (table, sstable_info) = generate_test_data(sstable_store.clone()).await;
776        let read_options = Arc::new(SstableIteratorReadOptions::default());
777        let iters = vec![SstableIterator::create(
778            table,
779            sstable_store,
780            read_options,
781            &sstable_info,
782        )];
783        let mi = MergeIterator::new(iters);
784        let begin_key = Included(iterator_test_bytes_user_key_of(2));
785
786        let mut ui = UserIterator::for_test(mi, (begin_key, Unbounded));
787
788        // ----- basic iterate -----
789        ui.rewind().await.unwrap();
790        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
791        ui.next().await.unwrap();
792        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
793        ui.next().await.unwrap();
794        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
795        ui.next().await.unwrap();
796        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
797        ui.next().await.unwrap();
798        assert!(!ui.is_valid());
799
800        // ----- begin-range iterate -----
801        ui.seek(iterator_test_bytes_user_key_of(1).as_ref())
802            .await
803            .unwrap();
804        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
805        ui.next().await.unwrap();
806        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
807        ui.next().await.unwrap();
808        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
809        ui.next().await.unwrap();
810        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
811        ui.next().await.unwrap();
812        assert!(!ui.is_valid());
813
814        // ----- in-range iterate -----
815        ui.seek(iterator_test_bytes_user_key_of(2).as_ref())
816            .await
817            .unwrap();
818        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref());
819        ui.next().await.unwrap();
820        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref());
821        ui.next().await.unwrap();
822        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref());
823        ui.next().await.unwrap();
824        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
825        ui.next().await.unwrap();
826        assert!(!ui.is_valid());
827
828        // ----- end-range iterate -----
829        ui.seek(iterator_test_bytes_user_key_of(8).as_ref())
830            .await
831            .unwrap();
832        assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref());
833        ui.next().await.unwrap();
834        assert!(!ui.is_valid());
835
836        // ----- after-end-range iterate -----
837        ui.seek(iterator_test_bytes_user_key_of(9).as_ref())
838            .await
839            .unwrap();
840        assert!(!ui.is_valid());
841    }
842
843    #[tokio::test]
844    async fn test_min_epoch() {
845        let sstable_store = mock_sstable_store().await;
846        let read_options = Arc::new(SstableIteratorReadOptions::default());
847        let (table0, sstable_info_0) = gen_iterator_test_sstable_with_incr_epoch(
848            0,
849            default_builder_opt_for_test(),
850            |x| x * 3,
851            sstable_store.clone(),
852            TEST_KEYS_COUNT,
853            1,
854        )
855        .await;
856        let iters = vec![SstableIterator::create(
857            table0,
858            sstable_store.clone(),
859            read_options.clone(),
860            &sstable_info_0,
861        )];
862
863        let min_count = (TEST_KEYS_COUNT / 5) as u64;
864        let min_epoch = test_epoch(min_count);
865        let mi = MergeIterator::new(iters);
866        let mut ui =
867            UserIterator::for_test_with_epoch(mi, (Unbounded, Unbounded), u64::MAX, min_epoch);
868        ui.rewind().await.unwrap();
869
870        let mut i = 0;
871        while ui.is_valid() {
872            let key = ui.key();
873            let key_epoch = key.epoch_with_gap.pure_epoch();
874            assert!(key_epoch >= min_epoch);
875
876            i += 1;
877            ui.next().await.unwrap();
878        }
879
880        let expect_count = TEST_KEYS_COUNT - (min_epoch / test_epoch(1)) as usize + 1;
881        assert_eq!(i, expect_count);
882    }
883}