risingwave_storage/hummock/sstable/
forward_sstable_iterator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound::*;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use thiserror_ext::AsReport;
22
23use super::super::{HummockResult, HummockValue};
24use crate::hummock::block_stream::BlockStream;
25use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
26use crate::hummock::sstable::SstableIteratorReadOptions;
27use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder};
28use crate::monitor::StoreLocalStatistic;
29
30pub trait SstableIteratorType: HummockIterator + 'static {
31    fn create(
32        sstable: TableHolder,
33        sstable_store: SstableStoreRef,
34        read_options: Arc<SstableIteratorReadOptions>,
35        sstable_info_ref: &SstableInfo,
36    ) -> Self;
37}
38
39/// Iterates on a sstable.
40pub struct SstableIterator {
41    /// The iterator of the current block.
42    block_iter: Option<BlockIterator>,
43
44    /// Current block index.
45    cur_idx: usize,
46
47    preload_stream: Option<Box<dyn BlockStream>>,
48    /// Reference to the sst
49    pub sst: TableHolder,
50    preload_end_block_idx: usize,
51    preload_retry_times: usize,
52
53    sstable_store: SstableStoreRef,
54    stats: StoreLocalStatistic,
55    options: Arc<SstableIteratorReadOptions>,
56
57    // used for checking if the block is valid, filter out the block that is not in the table-id range
58    block_start_idx_inclusive: usize,
59    block_end_idx_inclusive: usize,
60}
61
62impl SstableIterator {
63    pub fn new(
64        sstable: TableHolder,
65        sstable_store: SstableStoreRef,
66        options: Arc<SstableIteratorReadOptions>,
67        sstable_info_ref: &SstableInfo,
68    ) -> Self {
69        let mut block_start_idx_inclusive = 0;
70        let mut block_end_idx_inclusive = sstable.meta.block_metas.len() - 1;
71        let read_table_id_range = (
72            *sstable_info_ref.table_ids.first().unwrap(),
73            *sstable_info_ref.table_ids.last().unwrap(),
74        );
75        assert!(
76            read_table_id_range.0 <= read_table_id_range.1,
77            "invalid table id range {} - {}",
78            read_table_id_range.0,
79            read_table_id_range.1
80        );
81        let block_meta_count = sstable.meta.block_metas.len();
82        assert!(block_meta_count > 0);
83        assert!(
84            sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
85            "table id {} not found table_ids in block_meta {:?}",
86            read_table_id_range.0,
87            sstable
88                .meta
89                .block_metas
90                .iter()
91                .map(|meta| meta.table_id())
92                .collect::<Vec<_>>()
93        );
94        assert!(
95            sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
96            "table id {} not found table_ids in block_meta {:?}",
97            read_table_id_range.1,
98            sstable
99                .meta
100                .block_metas
101                .iter()
102                .map(|meta| meta.table_id())
103                .collect::<Vec<_>>()
104        );
105
106        while block_start_idx_inclusive < block_meta_count
107            && sstable.meta.block_metas[block_start_idx_inclusive].table_id()
108                < read_table_id_range.0
109        {
110            block_start_idx_inclusive += 1;
111        }
112        // We assume that the table id read must exist in the sstable, otherwise it is a fatal error.
113        assert!(
114            block_start_idx_inclusive < block_meta_count,
115            "table id {} not found table_ids in block_meta {:?}",
116            read_table_id_range.0,
117            sstable
118                .meta
119                .block_metas
120                .iter()
121                .map(|meta| meta.table_id())
122                .collect::<Vec<_>>()
123        );
124
125        while block_end_idx_inclusive > block_start_idx_inclusive
126            && sstable.meta.block_metas[block_end_idx_inclusive].table_id() > read_table_id_range.1
127        {
128            block_end_idx_inclusive -= 1;
129        }
130        assert!(
131            block_end_idx_inclusive >= block_start_idx_inclusive,
132            "block_end_idx_inclusive {} < block_start_idx_inclusive {} block_meta_count {}",
133            block_end_idx_inclusive,
134            block_start_idx_inclusive,
135            block_meta_count
136        );
137
138        Self {
139            block_iter: None,
140            cur_idx: 0,
141            preload_stream: None,
142            sst: sstable,
143            sstable_store,
144            stats: StoreLocalStatistic::default(),
145            options,
146            preload_end_block_idx: 0,
147            preload_retry_times: 0,
148            block_start_idx_inclusive,
149            block_end_idx_inclusive,
150        }
151    }
152
153    fn init_block_prefetch_range(&mut self, start_idx: usize) {
154        assert!(
155            start_idx >= self.block_start_idx_inclusive
156                && start_idx <= self.block_end_idx_inclusive
157        );
158
159        self.preload_end_block_idx = 0;
160        if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() {
161            let block_metas = &self.sst.meta.block_metas
162                [self.block_start_idx_inclusive..=self.block_end_idx_inclusive];
163            let next_to_start_idx = start_idx + 1;
164            if next_to_start_idx <= self.block_end_idx_inclusive {
165                let end_idx = match bound {
166                    Unbounded => self.block_end_idx_inclusive + 1,
167                    Included(dest_key) => {
168                        let dest_key = dest_key.as_ref();
169                        self.block_start_idx_inclusive
170                            + block_metas.partition_point(|block_meta| {
171                                FullKey::decode(&block_meta.smallest_key).user_key <= dest_key
172                            })
173                    }
174                    Excluded(end_key) => {
175                        let end_key = end_key.as_ref();
176                        self.block_start_idx_inclusive
177                            + block_metas.partition_point(|block_meta| {
178                                FullKey::decode(&block_meta.smallest_key).user_key < end_key
179                            })
180                    }
181                };
182
183                // `preload_end_block_idx` is exclusive
184                if next_to_start_idx < end_idx {
185                    assert!(
186                        end_idx <= self.block_end_idx_inclusive + 1,
187                        "end_idx {} > block_end_idx_inclusive {} next_to_start_idx {}",
188                        end_idx,
189                        self.block_end_idx_inclusive,
190                        next_to_start_idx
191                    );
192                    self.preload_end_block_idx = end_idx;
193                }
194            }
195        }
196    }
197
198    /// Seeks to a block, and then seeks to the key if `seek_key` is given.
199    async fn seek_idx(
200        &mut self,
201        idx: usize,
202        seek_key: Option<FullKey<&[u8]>>,
203    ) -> HummockResult<()> {
204        tracing::debug!(
205            target: "events::storage::sstable::block_seek",
206            "table iterator seek: sstable_object_id = {}, block_id = {}",
207            self.sst.id,
208            idx,
209        );
210
211        // When all data are in block cache, it is highly possible that this iterator will stay on a
212        // worker thread for a full time. Therefore, we use tokio's unstable API consume_budget to
213        // do cooperative scheduling.
214        tokio::task::consume_budget().await;
215
216        let mut hit_cache = false;
217        if idx > self.block_end_idx_inclusive {
218            self.block_iter = None;
219            return Ok(());
220        }
221        // Maybe the previous preload stream breaks on some cached block, so here we can try to preload some data again
222        if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
223            match self
224                .sstable_store
225                .prefetch_blocks(
226                    &self.sst,
227                    idx,
228                    self.preload_end_block_idx,
229                    self.options.cache_policy,
230                    &mut self.stats,
231                )
232                .instrument_await("prefetch_blocks".verbose())
233                .await
234            {
235                Ok(preload_stream) => self.preload_stream = Some(preload_stream),
236                Err(e) => {
237                    tracing::warn!(error = %e.as_report(), "failed to create stream for prefetch data, fall back to block get")
238                }
239            }
240        }
241
242        if self
243            .preload_stream
244            .as_ref()
245            .map(|preload_stream| preload_stream.next_block_index() <= idx)
246            .unwrap_or(false)
247        {
248            while let Some(preload_stream) = self.preload_stream.as_mut() {
249                let mut ret = Ok(());
250                while preload_stream.next_block_index() < idx {
251                    if let Err(e) = preload_stream.next_block().await {
252                        ret = Err(e);
253                        break;
254                    }
255                }
256                assert_eq!(preload_stream.next_block_index(), idx);
257                if ret.is_ok() {
258                    match preload_stream.next_block().await {
259                        Ok(Some(block)) => {
260                            hit_cache = true;
261                            self.block_iter = Some(BlockIterator::new(block));
262                            break;
263                        }
264                        Ok(None) => {
265                            self.preload_stream.take();
266                        }
267                        Err(e) => {
268                            self.preload_stream.take();
269                            ret = Err(e);
270                        }
271                    }
272                } else {
273                    self.preload_stream.take();
274                }
275                if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
276                    if let Err(e) = ret {
277                        tracing::warn!(error = %e.as_report(), "recreate stream because the connection to remote storage has closed");
278                        if self.preload_retry_times >= self.options.max_preload_retry_times {
279                            break;
280                        }
281                        self.preload_retry_times += 1;
282                    }
283
284                    match self
285                        .sstable_store
286                        .prefetch_blocks(
287                            &self.sst,
288                            idx,
289                            self.preload_end_block_idx,
290                            self.options.cache_policy,
291                            &mut self.stats,
292                        )
293                        .instrument_await("prefetch_blocks".verbose())
294                        .await
295                    {
296                        Ok(stream) => {
297                            self.preload_stream = Some(stream);
298                        }
299                        Err(e) => {
300                            tracing::warn!(error = %e.as_report(), "failed to recreate stream meet IO error");
301                            break;
302                        }
303                    }
304                }
305            }
306        }
307        if !hit_cache {
308            let block = self
309                .sstable_store
310                .get(&self.sst, idx, self.options.cache_policy, &mut self.stats)
311                .await?;
312            self.block_iter = Some(BlockIterator::new(block));
313        };
314        let block_iter = self.block_iter.as_mut().unwrap();
315        if let Some(key) = seek_key {
316            block_iter.seek(key);
317        } else {
318            block_iter.seek_to_first();
319        }
320
321        self.cur_idx = idx;
322
323        Ok(())
324    }
325
326    fn calculate_block_idx_by_key(&self, key: FullKey<&[u8]>) -> usize {
327        self.block_start_idx_inclusive
328            + self.sst.meta.block_metas
329                [self.block_start_idx_inclusive..=self.block_end_idx_inclusive]
330                .partition_point(|block_meta| {
331                    // compare by version comparator
332                    // Note: we are comparing against the `smallest_key` of the `block`, thus the
333                    // partition point should be `prev(<=)` instead of `<`.
334                    FullKey::decode(&block_meta.smallest_key).le(&key)
335                })
336                .saturating_sub(1) // considering the boundary of 0
337    }
338}
339
340impl HummockIterator for SstableIterator {
341    type Direction = Forward;
342
343    async fn next(&mut self) -> HummockResult<()> {
344        self.stats.total_key_count += 1;
345        let block_iter = self.block_iter.as_mut().expect("no block iter");
346        if !block_iter.try_next() {
347            // seek to next block
348            self.seek_idx(self.cur_idx + 1, None).await?;
349        }
350
351        Ok(())
352    }
353
354    fn key(&self) -> FullKey<&[u8]> {
355        self.block_iter.as_ref().expect("no block iter").key()
356    }
357
358    fn value(&self) -> HummockValue<&[u8]> {
359        let raw_value = self.block_iter.as_ref().expect("no block iter").value();
360
361        HummockValue::from_slice(raw_value).expect("decode error")
362    }
363
364    fn is_valid(&self) -> bool {
365        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
366    }
367
368    async fn rewind(&mut self) -> HummockResult<()> {
369        self.init_block_prefetch_range(self.block_start_idx_inclusive);
370        // seek_idx will update the current block iter state
371        self.seek_idx(self.block_start_idx_inclusive, None).await?;
372        Ok(())
373    }
374
375    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
376        let block_idx = self.calculate_block_idx_by_key(key);
377        self.init_block_prefetch_range(block_idx);
378
379        self.seek_idx(block_idx, Some(key)).await?;
380        if !self.is_valid() {
381            // seek to next block
382            self.seek_idx(block_idx + 1, None).await?;
383        }
384        Ok(())
385    }
386
387    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
388        stats.add(&self.stats);
389    }
390
391    fn value_meta(&self) -> ValueMeta {
392        ValueMeta {
393            object_id: Some(self.sst.id),
394            block_id: Some(self.cur_idx as _),
395        }
396    }
397}
398
399impl SstableIteratorType for SstableIterator {
400    fn create(
401        sstable: TableHolder,
402        sstable_store: SstableStoreRef,
403        options: Arc<SstableIteratorReadOptions>,
404        sstable_info_ref: &SstableInfo,
405    ) -> Self {
406        SstableIterator::new(sstable, sstable_store, options, sstable_info_ref)
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use std::collections::Bound;
413
414    use bytes::Bytes;
415    use foyer::Hint;
416    use itertools::Itertools;
417    use rand::prelude::*;
418    use rand::rng as thread_rng;
419    use risingwave_common::catalog::TableId;
420    use risingwave_common::hash::VirtualNode;
421    use risingwave_common::util::epoch::test_epoch;
422    use risingwave_hummock_sdk::EpochWithGap;
423    use risingwave_hummock_sdk::key::{TableKey, UserKey};
424    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
425
426    use super::*;
427    use crate::assert_bytes_eq;
428    use crate::hummock::CachePolicy;
429    use crate::hummock::iterator::test_utils::mock_sstable_store;
430    use crate::hummock::test_utils::{
431        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
432        gen_test_sstable_info, gen_test_sstable_with_table_ids, test_key_of, test_value_of,
433    };
434
435    async fn inner_test_forward_iterator(
436        sstable_store: SstableStoreRef,
437        handle: TableHolder,
438        sstable_info: SstableInfo,
439    ) {
440        // We should have at least 10 blocks, so that sstable iterator test could cover more code
441        // path.
442        let mut sstable_iter = SstableIterator::create(
443            handle,
444            sstable_store,
445            Arc::new(SstableIteratorReadOptions::default()),
446            &sstable_info,
447        );
448        let mut cnt = 0;
449        sstable_iter.rewind().await.unwrap();
450
451        while sstable_iter.is_valid() {
452            let key = sstable_iter.key();
453            let value = sstable_iter.value();
454            assert_eq!(key, test_key_of(cnt).to_ref());
455            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
456            cnt += 1;
457            sstable_iter.next().await.unwrap();
458        }
459
460        assert_eq!(cnt, TEST_KEYS_COUNT);
461    }
462
463    #[tokio::test]
464    async fn test_table_iterator() {
465        // Build remote sstable
466        let sstable_store = mock_sstable_store().await;
467        let (sstable, sstable_info) =
468            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
469                .await;
470        // We should have at least 10 blocks, so that sstable iterator test could cover more code
471        // path.
472        assert!(sstable.meta.block_metas.len() > 10);
473
474        inner_test_forward_iterator(sstable_store.clone(), sstable, sstable_info).await;
475    }
476
477    #[tokio::test]
478    async fn test_table_seek() {
479        let sstable_store = mock_sstable_store().await;
480        let (sstable, sstable_info) =
481            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
482                .await;
483        // We should have at least 10 blocks, so that sstable iterator test could cover more code
484        // path.
485        assert!(sstable.meta.block_metas.len() > 10);
486        let mut sstable_iter = SstableIterator::create(
487            sstable,
488            sstable_store,
489            Arc::new(SstableIteratorReadOptions::default()),
490            &sstable_info,
491        );
492        let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
493        let mut rng = thread_rng();
494        all_key_to_test.shuffle(&mut rng);
495
496        // We seek and access all the keys in random order
497        for i in all_key_to_test {
498            sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
499            // sstable_iter.next().await.unwrap();
500            let key = sstable_iter.key();
501            assert_eq!(key, test_key_of(i).to_ref());
502        }
503
504        // Seek to key #500 and start iterating.
505        sstable_iter.seek(test_key_of(500).to_ref()).await.unwrap();
506        for i in 500..TEST_KEYS_COUNT {
507            let key = sstable_iter.key();
508            assert_eq!(key, test_key_of(i).to_ref());
509            sstable_iter.next().await.unwrap();
510        }
511        assert!(!sstable_iter.is_valid());
512
513        // Seek to < first key
514        let smallest_key = FullKey::for_test(
515            TableId::default(),
516            [
517                VirtualNode::ZERO.to_be_bytes().as_slice(),
518                format!("key_aaaa_{:05}", 0).as_bytes(),
519            ]
520            .concat(),
521            test_epoch(233),
522        );
523        sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
524        let key = sstable_iter.key();
525        assert_eq!(key, test_key_of(0).to_ref());
526
527        // Seek to > last key
528        let largest_key = FullKey::for_test(
529            TableId::default(),
530            [
531                VirtualNode::ZERO.to_be_bytes().as_slice(),
532                format!("key_zzzz_{:05}", 0).as_bytes(),
533            ]
534            .concat(),
535            test_epoch(233),
536        );
537        sstable_iter.seek(largest_key.to_ref()).await.unwrap();
538        assert!(!sstable_iter.is_valid());
539
540        // Seek to non-existing key
541        for idx in 1..TEST_KEYS_COUNT {
542            // Seek to the previous key of each existing key. e.g.,
543            // Our key space is `key_test_00000`, `key_test_00002`, `key_test_00004`, ...
544            // And we seek to `key_test_00001` (will produce `key_test_00002`), `key_test_00003`
545            // (will produce `key_test_00004`).
546            sstable_iter
547                .seek(
548                    FullKey::for_test(
549                        TableId::default(),
550                        [
551                            VirtualNode::ZERO.to_be_bytes().as_slice(),
552                            format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
553                        ]
554                        .concat(),
555                        0,
556                    )
557                    .to_ref(),
558                )
559                .await
560                .unwrap();
561
562            let key = sstable_iter.key();
563            assert_eq!(key, test_key_of(idx).to_ref());
564            sstable_iter.next().await.unwrap();
565        }
566        assert!(!sstable_iter.is_valid());
567    }
568
569    #[tokio::test]
570    async fn test_prefetch_table_read() {
571        let sstable_store = mock_sstable_store().await;
572        // when upload data is successful, but upload meta is fail and delete is fail
573        let kv_iter =
574            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i))));
575        let sst_info = gen_test_sstable_info(
576            default_builder_opt_for_test(),
577            0,
578            kv_iter,
579            sstable_store.clone(),
580        )
581        .await;
582
583        let end_key = test_key_of(TEST_KEYS_COUNT);
584        let uk = UserKey::new(
585            end_key.user_key.table_id,
586            TableKey(Bytes::from(end_key.user_key.table_key.0)),
587        );
588        let options = Arc::new(SstableIteratorReadOptions {
589            cache_policy: CachePolicy::Fill(Hint::Normal),
590            must_iterated_end_user_key: Some(Bound::Included(uk.clone())),
591            max_preload_retry_times: 0,
592            prefetch_for_large_query: false,
593        });
594        let mut stats = StoreLocalStatistic::default();
595        let mut sstable_iter = SstableIterator::create(
596            sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
597            sstable_store.clone(),
598            options.clone(),
599            &sst_info,
600        );
601        let mut cnt = 1000;
602        sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
603        while sstable_iter.is_valid() {
604            let key = sstable_iter.key();
605            let value = sstable_iter.value();
606            assert_eq!(
607                key,
608                test_key_of(cnt).to_ref(),
609                "fail at {}, get key :{:?}",
610                cnt,
611                String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap()
612            );
613            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
614            cnt += 1;
615            sstable_iter.next().await.unwrap();
616        }
617        assert_eq!(cnt, TEST_KEYS_COUNT);
618        let mut sstable_iter = SstableIterator::create(
619            sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
620            sstable_store,
621            options.clone(),
622            &sst_info,
623        );
624        let mut cnt = 1000;
625        sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
626        while sstable_iter.is_valid() {
627            let key = sstable_iter.key();
628            let value = sstable_iter.value();
629            assert_eq!(key, test_key_of(cnt).to_ref());
630            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
631            cnt += 1;
632            sstable_iter.next().await.unwrap();
633        }
634        assert_eq!(cnt, TEST_KEYS_COUNT);
635    }
636
637    #[tokio::test]
638    async fn test_read_table_id_range() {
639        {
640            let sstable_store = mock_sstable_store().await;
641            let (sstable, sstable_info) =
642                gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
643                    .await;
644            let mut sstable_iter = SstableIterator::create(
645                sstable,
646                sstable_store.clone(),
647                Arc::new(SstableIteratorReadOptions::default()),
648                &sstable_info,
649            );
650            sstable_iter.rewind().await.unwrap();
651            assert!(sstable_iter.is_valid());
652            assert_eq!(sstable_iter.key(), test_key_of(0).to_ref());
653        }
654
655        {
656            let sstable_store = mock_sstable_store().await;
657            // test key_range right
658            let k1 = {
659                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
660                table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
661                let uk = UserKey::for_test(TableId::from(1), table_key);
662                FullKey {
663                    user_key: uk,
664                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
665                }
666            };
667
668            let k2 = {
669                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
670                table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
671                let uk = UserKey::for_test(TableId::from(2), table_key);
672                FullKey {
673                    user_key: uk,
674                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
675                }
676            };
677
678            let k3 = {
679                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
680                table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
681                let uk = UserKey::for_test(TableId::from(3), table_key);
682                FullKey {
683                    user_key: uk,
684                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
685                }
686            };
687
688            {
689                let kv_pairs = vec![
690                    (k1.clone(), HummockValue::put(test_value_of(1))),
691                    (k2.clone(), HummockValue::put(test_value_of(2))),
692                    (k3.clone(), HummockValue::put(test_value_of(3))),
693                ];
694
695                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
696                    default_builder_opt_for_test(),
697                    10,
698                    kv_pairs.into_iter(),
699                    sstable_store.clone(),
700                    vec![1, 2, 3],
701                )
702                .await;
703                let mut sstable_iter = SstableIterator::create(
704                    sstable,
705                    sstable_store.clone(),
706                    Arc::new(SstableIteratorReadOptions::default()),
707                    &SstableInfo::from(SstableInfoInner {
708                        table_ids: vec![1.into(), 2.into(), 3.into()],
709                        ..Default::default()
710                    }),
711                );
712                sstable_iter.rewind().await.unwrap();
713                assert!(sstable_iter.is_valid());
714                assert!(sstable_iter.key().eq(&k1.to_ref()));
715
716                let mut cnt = 0;
717                let mut last_key = k1.clone();
718                while sstable_iter.is_valid() {
719                    last_key = sstable_iter.key().to_vec();
720                    cnt += 1;
721                    sstable_iter.next().await.unwrap();
722                }
723
724                assert_eq!(3, cnt);
725                assert_eq!(last_key, k3.clone());
726            }
727
728            {
729                let kv_pairs = vec![
730                    (k1.clone(), HummockValue::put(test_value_of(1))),
731                    (k2.clone(), HummockValue::put(test_value_of(2))),
732                    (k3.clone(), HummockValue::put(test_value_of(3))),
733                ];
734
735                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
736                    default_builder_opt_for_test(),
737                    10,
738                    kv_pairs.into_iter(),
739                    sstable_store.clone(),
740                    vec![1, 2, 3],
741                )
742                .await;
743
744                let mut sstable_iter = SstableIterator::create(
745                    sstable,
746                    sstable_store.clone(),
747                    Arc::new(SstableIteratorReadOptions::default()),
748                    &SstableInfo::from(SstableInfoInner {
749                        table_ids: vec![1.into(), 2.into()],
750                        ..Default::default()
751                    }),
752                );
753                sstable_iter.rewind().await.unwrap();
754                assert!(sstable_iter.is_valid());
755                assert!(sstable_iter.key().eq(&k1.to_ref()));
756
757                let mut cnt = 0;
758                let mut last_key = k1.clone();
759                while sstable_iter.is_valid() {
760                    last_key = sstable_iter.key().to_vec();
761                    cnt += 1;
762                    sstable_iter.next().await.unwrap();
763                }
764
765                assert_eq!(2, cnt);
766                assert_eq!(last_key, k2.clone());
767            }
768
769            {
770                let kv_pairs = vec![
771                    (k1.clone(), HummockValue::put(test_value_of(1))),
772                    (k2.clone(), HummockValue::put(test_value_of(2))),
773                    (k3.clone(), HummockValue::put(test_value_of(3))),
774                ];
775
776                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
777                    default_builder_opt_for_test(),
778                    10,
779                    kv_pairs.into_iter(),
780                    sstable_store.clone(),
781                    vec![1, 2, 3],
782                )
783                .await;
784
785                let mut sstable_iter = SstableIterator::create(
786                    sstable,
787                    sstable_store.clone(),
788                    Arc::new(SstableIteratorReadOptions::default()),
789                    &SstableInfo::from(SstableInfoInner {
790                        table_ids: vec![2.into(), 3.into()],
791                        ..Default::default()
792                    }),
793                );
794                sstable_iter.rewind().await.unwrap();
795                assert!(sstable_iter.is_valid());
796                assert!(sstable_iter.key().eq(&k2.to_ref()));
797
798                let mut cnt = 0;
799                let mut last_key = k1.clone();
800                while sstable_iter.is_valid() {
801                    last_key = sstable_iter.key().to_vec();
802                    cnt += 1;
803                    sstable_iter.next().await.unwrap();
804                }
805
806                assert_eq!(2, cnt);
807                assert_eq!(last_key, k3.clone());
808            }
809
810            {
811                let kv_pairs = vec![
812                    (k1.clone(), HummockValue::put(test_value_of(1))),
813                    (k2.clone(), HummockValue::put(test_value_of(2))),
814                    (k3.clone(), HummockValue::put(test_value_of(3))),
815                ];
816
817                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
818                    default_builder_opt_for_test(),
819                    10,
820                    kv_pairs.into_iter(),
821                    sstable_store.clone(),
822                    vec![1, 2, 3],
823                )
824                .await;
825
826                let mut sstable_iter = SstableIterator::create(
827                    sstable,
828                    sstable_store.clone(),
829                    Arc::new(SstableIteratorReadOptions::default()),
830                    &SstableInfo::from(SstableInfoInner {
831                        table_ids: vec![2.into()],
832                        ..Default::default()
833                    }),
834                );
835                sstable_iter.rewind().await.unwrap();
836                assert!(sstable_iter.is_valid());
837                assert!(sstable_iter.key().eq(&k2.to_ref()));
838
839                let mut cnt = 0;
840                let mut last_key = k1.clone();
841                while sstable_iter.is_valid() {
842                    last_key = sstable_iter.key().to_vec();
843                    cnt += 1;
844                    sstable_iter.next().await.unwrap();
845                }
846
847                assert_eq!(1, cnt);
848                assert_eq!(last_key, k2.clone());
849            }
850        }
851    }
852}