risingwave_storage/hummock/sstable/
forward_sstable_iterator.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound::*;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use thiserror_ext::AsReport;
22
23use super::super::{HummockResult, HummockValue};
24use crate::hummock::block_stream::BlockStream;
25use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
26use crate::hummock::sstable::SstableIteratorReadOptions;
27use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder};
28use crate::monitor::StoreLocalStatistic;
29
30pub trait SstableIteratorType: HummockIterator + 'static {
31    fn create(
32        sstable: TableHolder,
33        sstable_store: SstableStoreRef,
34        read_options: Arc<SstableIteratorReadOptions>,
35        sstable_info_ref: &SstableInfo,
36    ) -> Self;
37}
38
39/// Iterates on a sstable.
40pub struct SstableIterator {
41    /// The iterator of the current block.
42    block_iter: Option<BlockIterator>,
43
44    /// Current block index.
45    cur_idx: usize,
46
47    preload_stream: Option<Box<dyn BlockStream>>,
48    /// Reference to the sst
49    pub sst: TableHolder,
50    preload_end_block_idx: usize,
51    preload_retry_times: usize,
52
53    sstable_store: SstableStoreRef,
54    stats: StoreLocalStatistic,
55    options: Arc<SstableIteratorReadOptions>,
56
57    // used for checking if the block is valid, filter out the block that is not in the table-id range
58    block_start_idx_inclusive: usize,
59    block_end_idx_inclusive: usize,
60}
61
62impl SstableIterator {
63    pub fn new(
64        sstable: TableHolder,
65        sstable_store: SstableStoreRef,
66        options: Arc<SstableIteratorReadOptions>,
67        sstable_info_ref: &SstableInfo,
68    ) -> Self {
69        let mut block_start_idx_inclusive = 0;
70        let mut block_end_idx_inclusive = sstable.meta.block_metas.len() - 1;
71        assert!(
72            !sstable_info_ref.table_ids.is_empty(),
73            "SstableIterator: SST {} (object {}) has empty table_ids",
74            sstable_info_ref.sst_id,
75            sstable_info_ref.object_id,
76        );
77        let read_table_id_range = (
78            *sstable_info_ref.table_ids.first().unwrap(),
79            *sstable_info_ref.table_ids.last().unwrap(),
80        );
81        assert!(
82            read_table_id_range.0 <= read_table_id_range.1,
83            "invalid table id range {} - {}",
84            read_table_id_range.0,
85            read_table_id_range.1
86        );
87        let block_meta_count = sstable.meta.block_metas.len();
88        assert!(block_meta_count > 0);
89        assert!(
90            sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
91            "table id {} not found table_ids in block_meta {:?}",
92            read_table_id_range.0,
93            sstable
94                .meta
95                .block_metas
96                .iter()
97                .map(|meta| meta.table_id())
98                .collect::<Vec<_>>()
99        );
100        assert!(
101            sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
102            "table id {} not found table_ids in block_meta {:?}",
103            read_table_id_range.1,
104            sstable
105                .meta
106                .block_metas
107                .iter()
108                .map(|meta| meta.table_id())
109                .collect::<Vec<_>>()
110        );
111
112        while block_start_idx_inclusive < block_meta_count
113            && sstable.meta.block_metas[block_start_idx_inclusive].table_id()
114                < read_table_id_range.0
115        {
116            block_start_idx_inclusive += 1;
117        }
118        // We assume that the table id read must exist in the sstable, otherwise it is a fatal error.
119        assert!(
120            block_start_idx_inclusive < block_meta_count,
121            "table id {} not found table_ids in block_meta {:?}",
122            read_table_id_range.0,
123            sstable
124                .meta
125                .block_metas
126                .iter()
127                .map(|meta| meta.table_id())
128                .collect::<Vec<_>>()
129        );
130
131        while block_end_idx_inclusive > block_start_idx_inclusive
132            && sstable.meta.block_metas[block_end_idx_inclusive].table_id() > read_table_id_range.1
133        {
134            block_end_idx_inclusive -= 1;
135        }
136        assert!(
137            block_end_idx_inclusive >= block_start_idx_inclusive,
138            "block_end_idx_inclusive {} < block_start_idx_inclusive {} block_meta_count {}",
139            block_end_idx_inclusive,
140            block_start_idx_inclusive,
141            block_meta_count
142        );
143
144        Self {
145            block_iter: None,
146            cur_idx: 0,
147            preload_stream: None,
148            sst: sstable,
149            sstable_store,
150            stats: StoreLocalStatistic::default(),
151            options,
152            preload_end_block_idx: 0,
153            preload_retry_times: 0,
154            block_start_idx_inclusive,
155            block_end_idx_inclusive,
156        }
157    }
158
159    fn init_block_prefetch_range(&mut self, start_idx: usize) {
160        assert!(
161            start_idx >= self.block_start_idx_inclusive
162                && start_idx <= self.block_end_idx_inclusive
163        );
164
165        self.preload_end_block_idx = 0;
166        if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() {
167            let block_metas = &self.sst.meta.block_metas
168                [self.block_start_idx_inclusive..=self.block_end_idx_inclusive];
169            let next_to_start_idx = start_idx + 1;
170            if next_to_start_idx <= self.block_end_idx_inclusive {
171                let end_idx = match bound {
172                    Unbounded => self.block_end_idx_inclusive + 1,
173                    Included(dest_key) => {
174                        let dest_key = dest_key.as_ref();
175                        self.block_start_idx_inclusive
176                            + block_metas.partition_point(|block_meta| {
177                                FullKey::decode(&block_meta.smallest_key).user_key <= dest_key
178                            })
179                    }
180                    Excluded(end_key) => {
181                        let end_key = end_key.as_ref();
182                        self.block_start_idx_inclusive
183                            + block_metas.partition_point(|block_meta| {
184                                FullKey::decode(&block_meta.smallest_key).user_key < end_key
185                            })
186                    }
187                };
188
189                // `preload_end_block_idx` is exclusive
190                if next_to_start_idx < end_idx {
191                    assert!(
192                        end_idx <= self.block_end_idx_inclusive + 1,
193                        "end_idx {} > block_end_idx_inclusive {} next_to_start_idx {}",
194                        end_idx,
195                        self.block_end_idx_inclusive,
196                        next_to_start_idx
197                    );
198                    self.preload_end_block_idx = end_idx;
199                }
200            }
201        }
202    }
203
204    /// Seeks to a block, and then seeks to the key if `seek_key` is given.
205    async fn seek_idx(
206        &mut self,
207        idx: usize,
208        seek_key: Option<FullKey<&[u8]>>,
209    ) -> HummockResult<()> {
210        tracing::debug!(
211            target: "events::storage::sstable::block_seek",
212            "table iterator seek: sstable_object_id = {}, block_id = {}",
213            self.sst.id,
214            idx,
215        );
216
217        // When all data are in block cache, it is highly possible that this iterator will stay on a
218        // worker thread for a full time. Therefore, we use tokio's unstable API consume_budget to
219        // do cooperative scheduling.
220        tokio::task::consume_budget().await;
221
222        let mut hit_cache = false;
223        if idx > self.block_end_idx_inclusive {
224            self.block_iter = None;
225            return Ok(());
226        }
227        // Maybe the previous preload stream breaks on some cached block, so here we can try to preload some data again
228        if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
229            match self
230                .sstable_store
231                .prefetch_blocks(
232                    &self.sst,
233                    idx,
234                    self.preload_end_block_idx,
235                    self.options.cache_policy,
236                    &mut self.stats,
237                )
238                .instrument_await("prefetch_blocks".verbose())
239                .await
240            {
241                Ok(preload_stream) => self.preload_stream = Some(preload_stream),
242                Err(e) => {
243                    tracing::warn!(error = %e.as_report(), "failed to create stream for prefetch data, fall back to block get")
244                }
245            }
246        }
247
248        if self
249            .preload_stream
250            .as_ref()
251            .map(|preload_stream| preload_stream.next_block_index() <= idx)
252            .unwrap_or(false)
253        {
254            while let Some(preload_stream) = self.preload_stream.as_mut() {
255                let mut ret = Ok(());
256                while preload_stream.next_block_index() < idx {
257                    if let Err(e) = preload_stream.next_block().await {
258                        ret = Err(e);
259                        break;
260                    }
261                }
262                assert_eq!(preload_stream.next_block_index(), idx);
263                if ret.is_ok() {
264                    match preload_stream.next_block().await {
265                        Ok(Some(block)) => {
266                            hit_cache = true;
267                            self.block_iter = Some(BlockIterator::new(block));
268                            break;
269                        }
270                        Ok(None) => {
271                            self.preload_stream.take();
272                        }
273                        Err(e) => {
274                            self.preload_stream.take();
275                            ret = Err(e);
276                        }
277                    }
278                } else {
279                    self.preload_stream.take();
280                }
281                if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
282                    if let Err(e) = ret {
283                        tracing::warn!(error = %e.as_report(), "recreate stream because the connection to remote storage has closed");
284                        if self.preload_retry_times >= self.options.max_preload_retry_times {
285                            break;
286                        }
287                        self.preload_retry_times += 1;
288                    }
289
290                    match self
291                        .sstable_store
292                        .prefetch_blocks(
293                            &self.sst,
294                            idx,
295                            self.preload_end_block_idx,
296                            self.options.cache_policy,
297                            &mut self.stats,
298                        )
299                        .instrument_await("prefetch_blocks".verbose())
300                        .await
301                    {
302                        Ok(stream) => {
303                            self.preload_stream = Some(stream);
304                        }
305                        Err(e) => {
306                            tracing::warn!(error = %e.as_report(), "failed to recreate stream meet IO error");
307                            break;
308                        }
309                    }
310                }
311            }
312        }
313        if !hit_cache {
314            let block = self
315                .sstable_store
316                .get(&self.sst, idx, self.options.cache_policy, &mut self.stats)
317                .await?;
318            self.block_iter = Some(BlockIterator::new(block));
319        };
320        let block_iter = self.block_iter.as_mut().unwrap();
321        if let Some(key) = seek_key {
322            block_iter.seek(key);
323        } else {
324            block_iter.seek_to_first();
325        }
326
327        self.cur_idx = idx;
328
329        Ok(())
330    }
331
332    fn calculate_block_idx_by_key(&self, key: FullKey<&[u8]>) -> usize {
333        self.block_start_idx_inclusive
334            + self.sst.meta.block_metas
335                [self.block_start_idx_inclusive..=self.block_end_idx_inclusive]
336                .partition_point(|block_meta| {
337                    // compare by version comparator
338                    // Note: we are comparing against the `smallest_key` of the `block`, thus the
339                    // partition point should be `prev(<=)` instead of `<`.
340                    FullKey::decode(&block_meta.smallest_key).le(&key)
341                })
342                .saturating_sub(1) // considering the boundary of 0
343    }
344}
345
346impl HummockIterator for SstableIterator {
347    type Direction = Forward;
348
349    async fn next(&mut self) -> HummockResult<()> {
350        self.stats.total_key_count += 1;
351        let block_iter = self.block_iter.as_mut().expect("no block iter");
352        if !block_iter.try_next() {
353            // seek to next block
354            self.seek_idx(self.cur_idx + 1, None).await?;
355        }
356
357        Ok(())
358    }
359
360    fn key(&self) -> FullKey<&[u8]> {
361        self.block_iter.as_ref().expect("no block iter").key()
362    }
363
364    fn value(&self) -> HummockValue<&[u8]> {
365        let raw_value = self.block_iter.as_ref().expect("no block iter").value();
366
367        HummockValue::from_slice(raw_value).expect("decode error")
368    }
369
370    fn is_valid(&self) -> bool {
371        self.block_iter.as_ref().is_some_and(|i| i.is_valid())
372    }
373
374    async fn rewind(&mut self) -> HummockResult<()> {
375        self.init_block_prefetch_range(self.block_start_idx_inclusive);
376        // seek_idx will update the current block iter state
377        self.seek_idx(self.block_start_idx_inclusive, None).await?;
378        Ok(())
379    }
380
381    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
382        let block_idx = self.calculate_block_idx_by_key(key);
383        self.init_block_prefetch_range(block_idx);
384
385        self.seek_idx(block_idx, Some(key)).await?;
386        if !self.is_valid() {
387            // seek to next block
388            self.seek_idx(block_idx + 1, None).await?;
389        }
390        Ok(())
391    }
392
393    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
394        stats.add(&self.stats);
395    }
396
397    fn value_meta(&self) -> ValueMeta {
398        ValueMeta {
399            object_id: Some(self.sst.id),
400            block_id: Some(self.cur_idx as _),
401        }
402    }
403}
404
405impl SstableIteratorType for SstableIterator {
406    fn create(
407        sstable: TableHolder,
408        sstable_store: SstableStoreRef,
409        options: Arc<SstableIteratorReadOptions>,
410        sstable_info_ref: &SstableInfo,
411    ) -> Self {
412        SstableIterator::new(sstable, sstable_store, options, sstable_info_ref)
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use std::collections::Bound;
419
420    use bytes::Bytes;
421    use foyer::Hint;
422    use itertools::Itertools;
423    use rand::prelude::*;
424    use rand::rng as thread_rng;
425    use risingwave_common::catalog::TableId;
426    use risingwave_common::hash::VirtualNode;
427    use risingwave_common::util::epoch::test_epoch;
428    use risingwave_hummock_sdk::EpochWithGap;
429    use risingwave_hummock_sdk::key::{TableKey, UserKey};
430    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
431
432    use super::*;
433    use crate::assert_bytes_eq;
434    use crate::hummock::CachePolicy;
435    use crate::hummock::iterator::test_utils::mock_sstable_store;
436    use crate::hummock::test_utils::{
437        TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
438        gen_test_sstable_info, gen_test_sstable_with_table_ids, test_key_of, test_value_of,
439    };
440
441    async fn inner_test_forward_iterator(
442        sstable_store: SstableStoreRef,
443        handle: TableHolder,
444        sstable_info: SstableInfo,
445    ) {
446        // We should have at least 10 blocks, so that sstable iterator test could cover more code
447        // path.
448        let mut sstable_iter = SstableIterator::create(
449            handle,
450            sstable_store,
451            Arc::new(SstableIteratorReadOptions::default()),
452            &sstable_info,
453        );
454        let mut cnt = 0;
455        sstable_iter.rewind().await.unwrap();
456
457        while sstable_iter.is_valid() {
458            let key = sstable_iter.key();
459            let value = sstable_iter.value();
460            assert_eq!(key, test_key_of(cnt).to_ref());
461            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
462            cnt += 1;
463            sstable_iter.next().await.unwrap();
464        }
465
466        assert_eq!(cnt, TEST_KEYS_COUNT);
467    }
468
469    #[tokio::test]
470    async fn test_table_iterator() {
471        // Build remote sstable
472        let sstable_store = mock_sstable_store().await;
473        let (sstable, sstable_info) =
474            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
475                .await;
476        // We should have at least 10 blocks, so that sstable iterator test could cover more code
477        // path.
478        assert!(sstable.meta.block_metas.len() > 10);
479
480        inner_test_forward_iterator(sstable_store.clone(), sstable, sstable_info).await;
481    }
482
483    #[tokio::test]
484    async fn test_table_seek() {
485        let sstable_store = mock_sstable_store().await;
486        let (sstable, sstable_info) =
487            gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
488                .await;
489        // We should have at least 10 blocks, so that sstable iterator test could cover more code
490        // path.
491        assert!(sstable.meta.block_metas.len() > 10);
492        let mut sstable_iter = SstableIterator::create(
493            sstable,
494            sstable_store,
495            Arc::new(SstableIteratorReadOptions::default()),
496            &sstable_info,
497        );
498        let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
499        let mut rng = thread_rng();
500        all_key_to_test.shuffle(&mut rng);
501
502        // We seek and access all the keys in random order
503        for i in all_key_to_test {
504            sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
505            // sstable_iter.next().await.unwrap();
506            let key = sstable_iter.key();
507            assert_eq!(key, test_key_of(i).to_ref());
508        }
509
510        // Seek to key #500 and start iterating.
511        sstable_iter.seek(test_key_of(500).to_ref()).await.unwrap();
512        for i in 500..TEST_KEYS_COUNT {
513            let key = sstable_iter.key();
514            assert_eq!(key, test_key_of(i).to_ref());
515            sstable_iter.next().await.unwrap();
516        }
517        assert!(!sstable_iter.is_valid());
518
519        // Seek to < first key
520        let smallest_key = FullKey::for_test(
521            TableId::default(),
522            [
523                VirtualNode::ZERO.to_be_bytes().as_slice(),
524                format!("key_aaaa_{:05}", 0).as_bytes(),
525            ]
526            .concat(),
527            test_epoch(233),
528        );
529        sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
530        let key = sstable_iter.key();
531        assert_eq!(key, test_key_of(0).to_ref());
532
533        // Seek to > last key
534        let largest_key = FullKey::for_test(
535            TableId::default(),
536            [
537                VirtualNode::ZERO.to_be_bytes().as_slice(),
538                format!("key_zzzz_{:05}", 0).as_bytes(),
539            ]
540            .concat(),
541            test_epoch(233),
542        );
543        sstable_iter.seek(largest_key.to_ref()).await.unwrap();
544        assert!(!sstable_iter.is_valid());
545
546        // Seek to non-existing key
547        for idx in 1..TEST_KEYS_COUNT {
548            // Seek to the previous key of each existing key. e.g.,
549            // Our key space is `key_test_00000`, `key_test_00002`, `key_test_00004`, ...
550            // And we seek to `key_test_00001` (will produce `key_test_00002`), `key_test_00003`
551            // (will produce `key_test_00004`).
552            sstable_iter
553                .seek(
554                    FullKey::for_test(
555                        TableId::default(),
556                        [
557                            VirtualNode::ZERO.to_be_bytes().as_slice(),
558                            format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
559                        ]
560                        .concat(),
561                        0,
562                    )
563                    .to_ref(),
564                )
565                .await
566                .unwrap();
567
568            let key = sstable_iter.key();
569            assert_eq!(key, test_key_of(idx).to_ref());
570            sstable_iter.next().await.unwrap();
571        }
572        assert!(!sstable_iter.is_valid());
573    }
574
575    #[tokio::test]
576    async fn test_prefetch_table_read() {
577        let sstable_store = mock_sstable_store().await;
578        // when upload data is successful, but upload meta is fail and delete is fail
579        let kv_iter =
580            (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i))));
581        let sst_info = gen_test_sstable_info(
582            default_builder_opt_for_test(),
583            0,
584            kv_iter,
585            sstable_store.clone(),
586        )
587        .await;
588
589        let end_key = test_key_of(TEST_KEYS_COUNT);
590        let uk = UserKey::new(
591            end_key.user_key.table_id,
592            TableKey(Bytes::from(end_key.user_key.table_key.0)),
593        );
594        let options = Arc::new(SstableIteratorReadOptions {
595            cache_policy: CachePolicy::Fill(Hint::Normal),
596            must_iterated_end_user_key: Some(Bound::Included(uk.clone())),
597            max_preload_retry_times: 0,
598            prefetch_for_large_query: false,
599        });
600        let mut stats = StoreLocalStatistic::default();
601        let mut sstable_iter = SstableIterator::create(
602            sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
603            sstable_store.clone(),
604            options.clone(),
605            &sst_info,
606        );
607        let mut cnt = 1000;
608        sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
609        while sstable_iter.is_valid() {
610            let key = sstable_iter.key();
611            let value = sstable_iter.value();
612            assert_eq!(
613                key,
614                test_key_of(cnt).to_ref(),
615                "fail at {}, get key :{:?}",
616                cnt,
617                String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap()
618            );
619            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
620            cnt += 1;
621            sstable_iter.next().await.unwrap();
622        }
623        assert_eq!(cnt, TEST_KEYS_COUNT);
624        let mut sstable_iter = SstableIterator::create(
625            sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
626            sstable_store,
627            options.clone(),
628            &sst_info,
629        );
630        let mut cnt = 1000;
631        sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
632        while sstable_iter.is_valid() {
633            let key = sstable_iter.key();
634            let value = sstable_iter.value();
635            assert_eq!(key, test_key_of(cnt).to_ref());
636            assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
637            cnt += 1;
638            sstable_iter.next().await.unwrap();
639        }
640        assert_eq!(cnt, TEST_KEYS_COUNT);
641    }
642
643    #[tokio::test]
644    async fn test_read_table_id_range() {
645        {
646            let sstable_store = mock_sstable_store().await;
647            let (sstable, sstable_info) =
648                gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
649                    .await;
650            let mut sstable_iter = SstableIterator::create(
651                sstable,
652                sstable_store.clone(),
653                Arc::new(SstableIteratorReadOptions::default()),
654                &sstable_info,
655            );
656            sstable_iter.rewind().await.unwrap();
657            assert!(sstable_iter.is_valid());
658            assert_eq!(sstable_iter.key(), test_key_of(0).to_ref());
659        }
660
661        {
662            let sstable_store = mock_sstable_store().await;
663            // test key_range right
664            let k1 = {
665                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
666                table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
667                let uk = UserKey::for_test(TableId::from(1), table_key);
668                FullKey {
669                    user_key: uk,
670                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
671                }
672            };
673
674            let k2 = {
675                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
676                table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
677                let uk = UserKey::for_test(TableId::from(2), table_key);
678                FullKey {
679                    user_key: uk,
680                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
681                }
682            };
683
684            let k3 = {
685                let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
686                table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
687                let uk = UserKey::for_test(TableId::from(3), table_key);
688                FullKey {
689                    user_key: uk,
690                    epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
691                }
692            };
693
694            {
695                let kv_pairs = vec![
696                    (k1.clone(), HummockValue::put(test_value_of(1))),
697                    (k2.clone(), HummockValue::put(test_value_of(2))),
698                    (k3.clone(), HummockValue::put(test_value_of(3))),
699                ];
700
701                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
702                    default_builder_opt_for_test(),
703                    10,
704                    kv_pairs.into_iter(),
705                    sstable_store.clone(),
706                    vec![1, 2, 3],
707                )
708                .await;
709                let mut sstable_iter = SstableIterator::create(
710                    sstable,
711                    sstable_store.clone(),
712                    Arc::new(SstableIteratorReadOptions::default()),
713                    &SstableInfo::from(SstableInfoInner {
714                        table_ids: vec![1.into(), 2.into(), 3.into()],
715                        ..Default::default()
716                    }),
717                );
718                sstable_iter.rewind().await.unwrap();
719                assert!(sstable_iter.is_valid());
720                assert!(sstable_iter.key().eq(&k1.to_ref()));
721
722                let mut cnt = 0;
723                let mut last_key = k1.clone();
724                while sstable_iter.is_valid() {
725                    last_key = sstable_iter.key().to_vec();
726                    cnt += 1;
727                    sstable_iter.next().await.unwrap();
728                }
729
730                assert_eq!(3, cnt);
731                assert_eq!(last_key, k3.clone());
732            }
733
734            {
735                let kv_pairs = vec![
736                    (k1.clone(), HummockValue::put(test_value_of(1))),
737                    (k2.clone(), HummockValue::put(test_value_of(2))),
738                    (k3.clone(), HummockValue::put(test_value_of(3))),
739                ];
740
741                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
742                    default_builder_opt_for_test(),
743                    10,
744                    kv_pairs.into_iter(),
745                    sstable_store.clone(),
746                    vec![1, 2, 3],
747                )
748                .await;
749
750                let mut sstable_iter = SstableIterator::create(
751                    sstable,
752                    sstable_store.clone(),
753                    Arc::new(SstableIteratorReadOptions::default()),
754                    &SstableInfo::from(SstableInfoInner {
755                        table_ids: vec![1.into(), 2.into()],
756                        ..Default::default()
757                    }),
758                );
759                sstable_iter.rewind().await.unwrap();
760                assert!(sstable_iter.is_valid());
761                assert!(sstable_iter.key().eq(&k1.to_ref()));
762
763                let mut cnt = 0;
764                let mut last_key = k1.clone();
765                while sstable_iter.is_valid() {
766                    last_key = sstable_iter.key().to_vec();
767                    cnt += 1;
768                    sstable_iter.next().await.unwrap();
769                }
770
771                assert_eq!(2, cnt);
772                assert_eq!(last_key, k2.clone());
773            }
774
775            {
776                let kv_pairs = vec![
777                    (k1.clone(), HummockValue::put(test_value_of(1))),
778                    (k2.clone(), HummockValue::put(test_value_of(2))),
779                    (k3.clone(), HummockValue::put(test_value_of(3))),
780                ];
781
782                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
783                    default_builder_opt_for_test(),
784                    10,
785                    kv_pairs.into_iter(),
786                    sstable_store.clone(),
787                    vec![1, 2, 3],
788                )
789                .await;
790
791                let mut sstable_iter = SstableIterator::create(
792                    sstable,
793                    sstable_store.clone(),
794                    Arc::new(SstableIteratorReadOptions::default()),
795                    &SstableInfo::from(SstableInfoInner {
796                        table_ids: vec![2.into(), 3.into()],
797                        ..Default::default()
798                    }),
799                );
800                sstable_iter.rewind().await.unwrap();
801                assert!(sstable_iter.is_valid());
802                assert!(sstable_iter.key().eq(&k2.to_ref()));
803
804                let mut cnt = 0;
805                let mut last_key = k1.clone();
806                while sstable_iter.is_valid() {
807                    last_key = sstable_iter.key().to_vec();
808                    cnt += 1;
809                    sstable_iter.next().await.unwrap();
810                }
811
812                assert_eq!(2, cnt);
813                assert_eq!(last_key, k3.clone());
814            }
815
816            {
817                let kv_pairs = vec![
818                    (k1.clone(), HummockValue::put(test_value_of(1))),
819                    (k2.clone(), HummockValue::put(test_value_of(2))),
820                    (k3.clone(), HummockValue::put(test_value_of(3))),
821                ];
822
823                let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
824                    default_builder_opt_for_test(),
825                    10,
826                    kv_pairs.into_iter(),
827                    sstable_store.clone(),
828                    vec![1, 2, 3],
829                )
830                .await;
831
832                let mut sstable_iter = SstableIterator::create(
833                    sstable,
834                    sstable_store.clone(),
835                    Arc::new(SstableIteratorReadOptions::default()),
836                    &SstableInfo::from(SstableInfoInner {
837                        table_ids: vec![2.into()],
838                        ..Default::default()
839                    }),
840                );
841                sstable_iter.rewind().await.unwrap();
842                assert!(sstable_iter.is_valid());
843                assert!(sstable_iter.key().eq(&k2.to_ref()));
844
845                let mut cnt = 0;
846                let mut last_key = k1.clone();
847                while sstable_iter.is_valid() {
848                    last_key = sstable_iter.key().to_vec();
849                    cnt += 1;
850                    sstable_iter.next().await.unwrap();
851                }
852
853                assert_eq!(1, cnt);
854                assert_eq!(last_key, k2.clone());
855            }
856        }
857    }
858}