risingwave_storage/hummock/sstable/
forward_sstable_iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound::*;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use thiserror_ext::AsReport;
22
23use super::super::{HummockResult, HummockValue};
24use crate::hummock::block_stream::BlockStream;
25use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
26use crate::hummock::sstable::SstableIteratorReadOptions;
27use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder};
28use crate::monitor::StoreLocalStatistic;
29
30pub trait SstableIteratorType: HummockIterator + 'static {
31    fn create(
32        sstable: TableHolder,
33        sstable_store: SstableStoreRef,
34        read_options: Arc<SstableIteratorReadOptions>,
35        sstable_info_ref: &SstableInfo,
36    ) -> Self;
37}
38
39/// Iterates on a sstable.
40pub struct SstableIterator {
41    /// The iterator of the current block.
42    block_iter: Option<BlockIterator>,
43
44    /// Current block index.
45    cur_idx: usize,
46
47    preload_stream: Option<Box<dyn BlockStream>>,
48    /// Reference to the sst
49    pub sst: TableHolder,
50    preload_end_block_idx: usize,
51    preload_retry_times: usize,
52
53    sstable_store: SstableStoreRef,
54    stats: StoreLocalStatistic,
55    options: Arc<SstableIteratorReadOptions>,
56
57    // used for checking if the block is valid, filter out the block that is not in the table-id range
58    block_start_idx_inclusive: usize,
59    block_end_idx_inclusive: usize,
60}
61
62impl SstableIterator {
63    pub fn new(
64        sstable: TableHolder,
65        sstable_store: SstableStoreRef,
66        options: Arc<SstableIteratorReadOptions>,
67        sstable_info_ref: &SstableInfo,
68    ) -> Self {
69        let mut block_start_idx_inclusive = 0;
70        let mut block_end_idx_inclusive = sstable.meta.block_metas.len() - 1;
71        let read_table_id_range = (
72            *sstable_info_ref.table_ids.first().unwrap(),
73            *sstable_info_ref.table_ids.last().unwrap(),
74        );
75        assert!(
76            read_table_id_range.0 <= read_table_id_range.1,
77            "invalid table id range {} - {}",
78            read_table_id_range.0,
79            read_table_id_range.1
80        );
81        let block_meta_count = sstable.meta.block_metas.len();
82        assert!(block_meta_count > 0);
83        assert!(
84            sstable.meta.block_metas[0].table_id().table_id() <= read_table_id_range.0,
85            "table id {} not found table_ids in block_meta {:?}",
86            read_table_id_range.0,
87            sstable
88                .meta
89                .block_metas
90                .iter()
91                .map(|meta| meta.table_id())
92                .collect::<Vec<_>>()
93        );
94        assert!(
95            sstable.meta.block_metas[block_meta_count - 1]
96                .table_id()
97                .table_id()
98                >= read_table_id_range.1,
99            "table id {} not found table_ids in block_meta {:?}",
100            read_table_id_range.1,
101            sstable
102                .meta
103                .block_metas
104                .iter()
105                .map(|meta| meta.table_id())
106                .collect::<Vec<_>>()
107        );
108
109        while block_start_idx_inclusive < block_meta_count
110            && sstable.meta.block_metas[block_start_idx_inclusive]
111                .table_id()
112                .table_id()
113                < read_table_id_range.0
114        {
115            block_start_idx_inclusive += 1;
116        }
117        // We assume that the table id read must exist in the sstable, otherwise it is a fatal error.
118        assert!(
119            block_start_idx_inclusive < block_meta_count,
120            "table id {} not found table_ids in block_meta {:?}",
121            read_table_id_range.0,
122            sstable
123                .meta
124                .block_metas
125                .iter()
126                .map(|meta| meta.table_id())
127                .collect::<Vec<_>>()
128        );
129
130        while block_end_idx_inclusive > block_start_idx_inclusive
131            && sstable.meta.block_metas[block_end_idx_inclusive]
132                .table_id()
133                .table_id()
134                > read_table_id_range.1
135        {
136            block_end_idx_inclusive -= 1;
137        }
138        assert!(
139            block_end_idx_inclusive >= block_start_idx_inclusive,
140            "block_end_idx_inclusive {} < block_start_idx_inclusive {} block_meta_count {}",
141            block_end_idx_inclusive,
142            block_start_idx_inclusive,
143            block_meta_count
144        );
145
146        Self {
147            block_iter: None,
148            cur_idx: 0,
149            preload_stream: None,
150            sst: sstable,
151            sstable_store,
152            stats: StoreLocalStatistic::default(),
153            options,
154            preload_end_block_idx: 0,
155            preload_retry_times: 0,
156            block_start_idx_inclusive,
157            block_end_idx_inclusive,
158        }
159    }
160
161    fn init_block_prefetch_range(&mut self, start_idx: usize) {
162        assert!(
163            start_idx >= self.block_start_idx_inclusive
164                && start_idx <= self.block_end_idx_inclusive
165        );
166
167        self.preload_end_block_idx = 0;
168        if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() {
169            let block_metas = &self.sst.meta.block_metas
170                [self.block_start_idx_inclusive..=self.block_end_idx_inclusive];
171            let next_to_start_idx = start_idx + 1;
172            if next_to_start_idx <= self.block_end_idx_inclusive {
173                let end_idx = match bound {
174                    Unbounded => self.block_end_idx_inclusive + 1,
175                    Included(dest_key) => {
176                        let dest_key = dest_key.as_ref();
177                        self.block_start_idx_inclusive
178                            + block_metas.partition_point(|block_meta| {
179                                FullKey::decode(&block_meta.smallest_key).user_key <= dest_key
180                            })
181                    }
182                    Excluded(end_key) => {
183                        let end_key = end_key.as_ref();
184                        self.block_start_idx_inclusive
185                            + block_metas.partition_point(|block_meta| {
186                                FullKey::decode(&block_meta.smallest_key).user_key < end_key
187                            })
188                    }
189                };
190
191                // `preload_end_block_idx` is exclusive
192                if next_to_start_idx < end_idx {
193                    assert!(
194                        end_idx <= self.block_end_idx_inclusive + 1,
195                        "end_idx {} > block_end_idx_inclusive {} next_to_start_idx {}",
196                        end_idx,
197                        self.block_end_idx_inclusive,
198                        next_to_start_idx
199                    );
200                    self.preload_end_block_idx = end_idx;
201                }
202            }
203        }
204    }
205
206    /// Seeks to a block, and then seeks to the key if `seek_key` is given.
207    async fn seek_idx(
208        &mut self,
209        idx: usize,
210        seek_key: Option<FullKey<&[u8]>>,
211    ) -> HummockResult<()> {
212        tracing::debug!(
213            target: "events::storage::sstable::block_seek",
214            "table iterator seek: sstable_object_id = {}, block_id = {}",
215            self.sst.id,
216            idx,
217        );
218
219        // When all data are in block cache, it is highly possible that this iterator will stay on a
220        // worker thread for a full time. Therefore, we use tokio's unstable API consume_budget to
221        // do cooperative scheduling.
222        tokio::task::consume_budget().await;
223
224        let mut hit_cache = false;
225        if idx > self.block_end_idx_inclusive {
226            self.block_iter = None;
227            return Ok(());
228        }
229        // Maybe the previous preload stream breaks on some cached block, so here we can try to preload some data again
230        if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
231            match self
232                .sstable_store
233                .prefetch_blocks(
234                    &self.sst,
235                    idx,
236                    self.preload_end_block_idx,
237                    self.options.cache_policy,
238                    &mut self.stats,
239                )
240                .instrument_await("prefetch_blocks".verbose())
241                .await
242            {
243                Ok(preload_stream) => self.preload_stream = Some(preload_stream),
244                Err(e) => {
245                    tracing::warn!(error = %e.as_report(), "failed to create stream for prefetch data, fall back to block get")
246                }
247            }
248        }
249
250        if self
251            .preload_stream
252            .as_ref()
253            .map(|preload_stream| preload_stream.next_block_index() <= idx)
254            .unwrap_or(false)
255        {
256            while let Some(preload_stream) = self.preload_stream.as_mut() {
257                let mut ret = Ok(());
258                while preload_stream.next_block_index() < idx {
259                    if let Err(e) = preload_stream.next_block().await {
260                        ret = Err(e);
261                        break;
262                    }
263                }
264                assert_eq!(preload_stream.next_block_index(), idx);
265                if ret.is_ok() {
266                    match preload_stream.next_block().await {
267                        Ok(Some(block)) => {
268                            hit_cache = true;
269                            self.block_iter = Some(BlockIterator::new(block));
270                            break;
271                        }
272                        Ok(None) => {
273                            self.preload_stream.take();
274                        }
275                        Err(e) => {
276                            self.preload_stream.take();
277                            ret = Err(e);
278                        }
279                    }
280                } else {
281                    self.preload_stream.take();
282                }
283                if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
284                    if let Err(e) = ret {
285                        tracing::warn!(error = %e.as_report(), "recreate stream because the connection to remote storage has closed");
286                        if self.preload_retry_times >= self.options.max_preload_retry_times {
287                            break;
288                        }
289                        self.preload_retry_times += 1;
290                    }
291
292                    match self
293                        .sstable_store
294                        .prefetch_blocks(
295                            &self.sst,
296                            idx,
297                            self.preload_end_block_idx,
298                            self.options.cache_policy,
299                            &mut self.stats,
300                        )
301                        .instrument_await("prefetch_blocks".verbose())
302                        .await
303                    {
304                        Ok(stream) => {
305                            self.preload_stream = Some(stream);
306                        }
307                        Err(e) => {
308                            tracing::warn!(error = %e.as_report(), "failed to recreate stream meet IO error");
309                            break;
310                        }
311                    }
312                }
313            }
314        }
315        if !hit_cache {
316            let block = self
317                .sstable_store
318                .get(&self.sst, idx, self.options.cache_policy, &mut self.stats)
319                .await?;
320            self.block_iter = Some(BlockIterator::new(block));
321        };
322        let block_iter = self.block_iter.as_mut().unwrap();
323        if let Some(key) = seek_key {
324            block_iter.seek(key);
325        } else {
326            block_iter.seek_to_first();
327        }
328
329        self.cur_idx = idx;
330
331        Ok(())
332    }
333
334    fn calculate_block_idx_by_key(&self, key: FullKey<&[u8]>) -> usize {
335        self.block_start_idx_inclusive
336            + self.sst.meta.block_metas
337                [self.block_start_idx_inclusive..=self.block_end_idx_inclusive]
338                .partition_point(|block_meta| {
339                    // compare by version comparator
340                    // Note: we are comparing against the `smallest_key` of the `block`, thus the
341                    // partition point should be `prev(<=)` instead of `<`.
342                    FullKey::decode(&block_meta.smallest_key).le(&key)
343                })
344                .saturating_sub(1) // considering the boundary of 0
345    }
346}
347
348impl HummockIterator for SstableIterator {
349    type Direction = Forward;
350
351    async fn next(&mut self) -> HummockResult<()> {
352        self.stats.total_key_count += 1;
353        let block_iter = self.block_iter.as_mut().expect("no block iter");
354        if !block_iter.try_next() {
355            // seek to next block
356            self.seek_idx(self.cur_idx + 1, None).await?;
357        }
358
359        Ok(())
360    }
361
362    fn key(&self) -> FullKey<&[u8]> {
363        self.block_iter.as_ref().expect("no block iter").key()
364    }
365
366    fn value(&self) -> HummockValue<&[u8]> {
367        let raw_value = self.block_iter.as_ref().expect("no block iter").value();
368
369        HummockValue::from_slice(raw_value).expect("decode error")
370    }
371
372    fn is_valid(&self) -> bool {
373        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
374    }
375
376    async fn rewind(&mut self) -> HummockResult<()> {
377        self.init_block_prefetch_range(self.block_start_idx_inclusive);
378        // seek_idx will update the current block iter state
379        self.seek_idx(self.block_start_idx_inclusive, None).await?;
380        Ok(())
381    }
382
383    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
384        let block_idx = self.calculate_block_idx_by_key(key);
385        self.init_block_prefetch_range(block_idx);
386
387        self.seek_idx(block_idx, Some(key)).await?;
388        if !self.is_valid() {
389            // seek to next block
390            self.seek_idx(block_idx + 1, None).await?;
391        }
392        Ok(())
393    }
394
395    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
396        stats.add(&self.stats);
397    }
398
399    fn value_meta(&self) -> ValueMeta {
400        ValueMeta {
401            object_id: Some(self.sst.id),
402            block_id: Some(self.cur_idx as _),
403        }
404    }
405}
406
407impl SstableIteratorType for SstableIterator {
408    fn create(
409        sstable: TableHolder,
410        sstable_store: SstableStoreRef,
411        options: Arc<SstableIteratorReadOptions>,
412        sstable_info_ref: &SstableInfo,
413    ) -> Self {
414        SstableIterator::new(sstable, sstable_store, options, sstable_info_ref)
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use std::collections::Bound;
421
422    use bytes::Bytes;
423    use foyer::CacheHint;
424    use itertools::Itertools;
425    use rand::prelude::*;
426    use rand::rng as thread_rng;
427    use risingwave_common::catalog::TableId;
428    use risingwave_common::hash::VirtualNode;
429    use risingwave_common::util::epoch::test_epoch;
430    use risingwave_hummock_sdk::EpochWithGap;
431    use risingwave_hummock_sdk::key::{TableKey, UserKey};
432    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
433
434    use super::*;
435    use crate::assert_bytes_eq;
436    use crate::hummock::CachePolicy;
437    use crate::hummock::iterator::test_utils::mock_sstable_store;
438    use crate::hummock::test_utils::{
439        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
440        gen_test_sstable_info, gen_test_sstable_with_table_ids, test_key_of, test_value_of,
441    };
442
443    async fn inner_test_forward_iterator(
444        sstable_store: SstableStoreRef,
445        handle: TableHolder,
446        sstable_info: SstableInfo,
447    ) {
448        // We should have at least 10 blocks, so that sstable iterator test could cover more code
449        // path.
450        let mut sstable_iter = SstableIterator::create(
451            handle,
452            sstable_store,
453            Arc::new(SstableIteratorReadOptions::default()),
454            &sstable_info,
455        );
456        let mut cnt = 0;
457        sstable_iter.rewind().await.unwrap();
458
459        while sstable_iter.is_valid() {
460            let key = sstable_iter.key();
461            let value = sstable_iter.value();
462            assert_eq!(key, test_key_of(cnt).to_ref());
463            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
464            cnt += 1;
465            sstable_iter.next().await.unwrap();
466        }
467
468        assert_eq!(cnt, TEST_KEYS_COUNT);
469    }
470
471    #[tokio::test]
472    async fn test_table_iterator() {
473        // Build remote sstable
474        let sstable_store = mock_sstable_store().await;
475        let (sstable, sstable_info) =
476            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
477                .await;
478        // We should have at least 10 blocks, so that sstable iterator test could cover more code
479        // path.
480        assert!(sstable.meta.block_metas.len() > 10);
481
482        inner_test_forward_iterator(sstable_store.clone(), sstable, sstable_info).await;
483    }
484
485    #[tokio::test]
486    async fn test_table_seek() {
487        let sstable_store = mock_sstable_store().await;
488        let (sstable, sstable_info) =
489            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
490                .await;
491        // We should have at least 10 blocks, so that sstable iterator test could cover more code
492        // path.
493        assert!(sstable.meta.block_metas.len() > 10);
494        let mut sstable_iter = SstableIterator::create(
495            sstable,
496            sstable_store,
497            Arc::new(SstableIteratorReadOptions::default()),
498            &sstable_info,
499        );
500        let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
501        let mut rng = thread_rng();
502        all_key_to_test.shuffle(&mut rng);
503
504        // We seek and access all the keys in random order
505        for i in all_key_to_test {
506            sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
507            // sstable_iter.next().await.unwrap();
508            let key = sstable_iter.key();
509            assert_eq!(key, test_key_of(i).to_ref());
510        }
511
512        // Seek to key #500 and start iterating.
513        sstable_iter.seek(test_key_of(500).to_ref()).await.unwrap();
514        for i in 500..TEST_KEYS_COUNT {
515            let key = sstable_iter.key();
516            assert_eq!(key, test_key_of(i).to_ref());
517            sstable_iter.next().await.unwrap();
518        }
519        assert!(!sstable_iter.is_valid());
520
521        // Seek to < first key
522        let smallest_key = FullKey::for_test(
523            TableId::default(),
524            [
525                VirtualNode::ZERO.to_be_bytes().as_slice(),
526                format!("key_aaaa_{:05}", 0).as_bytes(),
527            ]
528            .concat(),
529            test_epoch(233),
530        );
531        sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
532        let key = sstable_iter.key();
533        assert_eq!(key, test_key_of(0).to_ref());
534
535        // Seek to > last key
536        let largest_key = FullKey::for_test(
537            TableId::default(),
538            [
539                VirtualNode::ZERO.to_be_bytes().as_slice(),
540                format!("key_zzzz_{:05}", 0).as_bytes(),
541            ]
542            .concat(),
543            test_epoch(233),
544        );
545        sstable_iter.seek(largest_key.to_ref()).await.unwrap();
546        assert!(!sstable_iter.is_valid());
547
548        // Seek to non-existing key
549        for idx in 1..TEST_KEYS_COUNT {
550            // Seek to the previous key of each existing key. e.g.,
551            // Our key space is `key_test_00000`, `key_test_00002`, `key_test_00004`, ...
552            // And we seek to `key_test_00001` (will produce `key_test_00002`), `key_test_00003`
553            // (will produce `key_test_00004`).
554            sstable_iter
555                .seek(
556                    FullKey::for_test(
557                        TableId::default(),
558                        [
559                            VirtualNode::ZERO.to_be_bytes().as_slice(),
560                            format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
561                        ]
562                        .concat(),
563                        0,
564                    )
565                    .to_ref(),
566                )
567                .await
568                .unwrap();
569
570            let key = sstable_iter.key();
571            assert_eq!(key, test_key_of(idx).to_ref());
572            sstable_iter.next().await.unwrap();
573        }
574        assert!(!sstable_iter.is_valid());
575    }
576
577    #[tokio::test]
578    async fn test_prefetch_table_read() {
579        let sstable_store = mock_sstable_store().await;
580        // when upload data is successful, but upload meta is fail and delete is fail
581        let kv_iter =
582            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i))));
583        let sst_info = gen_test_sstable_info(
584            default_builder_opt_for_test(),
585            0,
586            kv_iter,
587            sstable_store.clone(),
588        )
589        .await;
590
591        let end_key = test_key_of(TEST_KEYS_COUNT);
592        let uk = UserKey::new(
593            end_key.user_key.table_id,
594            TableKey(Bytes::from(end_key.user_key.table_key.0)),
595        );
596        let options = Arc::new(SstableIteratorReadOptions {
597            cache_policy: CachePolicy::Fill(CacheHint::Normal),
598            must_iterated_end_user_key: Some(Bound::Included(uk.clone())),
599            max_preload_retry_times: 0,
600            prefetch_for_large_query: false,
601        });
602        let mut stats = StoreLocalStatistic::default();
603        let mut sstable_iter = SstableIterator::create(
604            sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
605            sstable_store.clone(),
606            options.clone(),
607            &sst_info,
608        );
609        let mut cnt = 1000;
610        sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
611        while sstable_iter.is_valid() {
612            let key = sstable_iter.key();
613            let value = sstable_iter.value();
614            assert_eq!(
615                key,
616                test_key_of(cnt).to_ref(),
617                "fail at {}, get key :{:?}",
618                cnt,
619                String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap()
620            );
621            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
622            cnt += 1;
623            sstable_iter.next().await.unwrap();
624        }
625        assert_eq!(cnt, TEST_KEYS_COUNT);
626        let mut sstable_iter = SstableIterator::create(
627            sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
628            sstable_store,
629            options.clone(),
630            &sst_info,
631        );
632        let mut cnt = 1000;
633        sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
634        while sstable_iter.is_valid() {
635            let key = sstable_iter.key();
636            let value = sstable_iter.value();
637            assert_eq!(key, test_key_of(cnt).to_ref());
638            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
639            cnt += 1;
640            sstable_iter.next().await.unwrap();
641        }
642        assert_eq!(cnt, TEST_KEYS_COUNT);
643    }
644
645    #[tokio::test]
646    async fn test_read_table_id_range() {
647        {
648            let sstable_store = mock_sstable_store().await;
649            let (sstable, sstable_info) =
650                gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
651                    .await;
652            let mut sstable_iter = SstableIterator::create(
653                sstable,
654                sstable_store.clone(),
655                Arc::new(SstableIteratorReadOptions::default()),
656                &sstable_info,
657            );
658            sstable_iter.rewind().await.unwrap();
659            assert!(sstable_iter.is_valid());
660            assert_eq!(sstable_iter.key(), test_key_of(0).to_ref());
661        }
662
663        {
664            let sstable_store = mock_sstable_store().await;
665            // test key_range right
666            let k1 = {
667                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
668                table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
669                let uk = UserKey::for_test(TableId::from(1), table_key);
670                FullKey {
671                    user_key: uk,
672                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
673                }
674            };
675
676            let k2 = {
677                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
678                table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
679                let uk = UserKey::for_test(TableId::from(2), table_key);
680                FullKey {
681                    user_key: uk,
682                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
683                }
684            };
685
686            let k3 = {
687                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
688                table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
689                let uk = UserKey::for_test(TableId::from(3), table_key);
690                FullKey {
691                    user_key: uk,
692                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
693                }
694            };
695
696            {
697                let kv_pairs = vec![
698                    (k1.clone(), HummockValue::put(test_value_of(1))),
699                    (k2.clone(), HummockValue::put(test_value_of(2))),
700                    (k3.clone(), HummockValue::put(test_value_of(3))),
701                ];
702
703                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
704                    default_builder_opt_for_test(),
705                    10,
706                    kv_pairs.into_iter(),
707                    sstable_store.clone(),
708                    vec![1, 2, 3],
709                )
710                .await;
711                let mut sstable_iter = SstableIterator::create(
712                    sstable,
713                    sstable_store.clone(),
714                    Arc::new(SstableIteratorReadOptions::default()),
715                    &SstableInfo::from(SstableInfoInner {
716                        table_ids: vec![1, 2, 3],
717                        ..Default::default()
718                    }),
719                );
720                sstable_iter.rewind().await.unwrap();
721                assert!(sstable_iter.is_valid());
722                assert!(sstable_iter.key().eq(&k1.to_ref()));
723
724                let mut cnt = 0;
725                let mut last_key = k1.clone();
726                while sstable_iter.is_valid() {
727                    last_key = sstable_iter.key().to_vec();
728                    cnt += 1;
729                    sstable_iter.next().await.unwrap();
730                }
731
732                assert_eq!(3, cnt);
733                assert_eq!(last_key, k3.clone());
734            }
735
736            {
737                let kv_pairs = vec![
738                    (k1.clone(), HummockValue::put(test_value_of(1))),
739                    (k2.clone(), HummockValue::put(test_value_of(2))),
740                    (k3.clone(), HummockValue::put(test_value_of(3))),
741                ];
742
743                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
744                    default_builder_opt_for_test(),
745                    10,
746                    kv_pairs.into_iter(),
747                    sstable_store.clone(),
748                    vec![1, 2, 3],
749                )
750                .await;
751
752                let mut sstable_iter = SstableIterator::create(
753                    sstable,
754                    sstable_store.clone(),
755                    Arc::new(SstableIteratorReadOptions::default()),
756                    &SstableInfo::from(SstableInfoInner {
757                        table_ids: vec![1, 2],
758                        ..Default::default()
759                    }),
760                );
761                sstable_iter.rewind().await.unwrap();
762                assert!(sstable_iter.is_valid());
763                assert!(sstable_iter.key().eq(&k1.to_ref()));
764
765                let mut cnt = 0;
766                let mut last_key = k1.clone();
767                while sstable_iter.is_valid() {
768                    last_key = sstable_iter.key().to_vec();
769                    cnt += 1;
770                    sstable_iter.next().await.unwrap();
771                }
772
773                assert_eq!(2, cnt);
774                assert_eq!(last_key, k2.clone());
775            }
776
777            {
778                let kv_pairs = vec![
779                    (k1.clone(), HummockValue::put(test_value_of(1))),
780                    (k2.clone(), HummockValue::put(test_value_of(2))),
781                    (k3.clone(), HummockValue::put(test_value_of(3))),
782                ];
783
784                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
785                    default_builder_opt_for_test(),
786                    10,
787                    kv_pairs.into_iter(),
788                    sstable_store.clone(),
789                    vec![1, 2, 3],
790                )
791                .await;
792
793                let mut sstable_iter = SstableIterator::create(
794                    sstable,
795                    sstable_store.clone(),
796                    Arc::new(SstableIteratorReadOptions::default()),
797                    &SstableInfo::from(SstableInfoInner {
798                        table_ids: vec![2, 3],
799                        ..Default::default()
800                    }),
801                );
802                sstable_iter.rewind().await.unwrap();
803                assert!(sstable_iter.is_valid());
804                assert!(sstable_iter.key().eq(&k2.to_ref()));
805
806                let mut cnt = 0;
807                let mut last_key = k1.clone();
808                while sstable_iter.is_valid() {
809                    last_key = sstable_iter.key().to_vec();
810                    cnt += 1;
811                    sstable_iter.next().await.unwrap();
812                }
813
814                assert_eq!(2, cnt);
815                assert_eq!(last_key, k3.clone());
816            }
817
818            {
819                let kv_pairs = vec![
820                    (k1.clone(), HummockValue::put(test_value_of(1))),
821                    (k2.clone(), HummockValue::put(test_value_of(2))),
822                    (k3.clone(), HummockValue::put(test_value_of(3))),
823                ];
824
825                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
826                    default_builder_opt_for_test(),
827                    10,
828                    kv_pairs.into_iter(),
829                    sstable_store.clone(),
830                    vec![1, 2, 3],
831                )
832                .await;
833
834                let mut sstable_iter = SstableIterator::create(
835                    sstable,
836                    sstable_store.clone(),
837                    Arc::new(SstableIteratorReadOptions::default()),
838                    &SstableInfo::from(SstableInfoInner {
839                        table_ids: vec![2],
840                        ..Default::default()
841                    }),
842                );
843                sstable_iter.rewind().await.unwrap();
844                assert!(sstable_iter.is_valid());
845                assert!(sstable_iter.key().eq(&k2.to_ref()));
846
847                let mut cnt = 0;
848                let mut last_key = k1.clone();
849                while sstable_iter.is_valid() {
850                    last_key = sstable_iter.key().to_vec();
851                    cnt += 1;
852                    sstable_iter.next().await.unwrap();
853                }
854
855                assert_eq!(1, cnt);
856                assert_eq!(last_key, k2.clone());
857            }
858        }
859    }
860}