1use std::ops::Bound::*;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use thiserror_ext::AsReport;
22
23use super::super::{HummockResult, HummockValue};
24use crate::hummock::block_stream::BlockStream;
25use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
26use crate::hummock::sstable::SstableIteratorReadOptions;
27use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder};
28use crate::monitor::StoreLocalStatistic;
29
30pub trait SstableIteratorType: HummockIterator + 'static {
31 fn create(
32 sstable: TableHolder,
33 sstable_store: SstableStoreRef,
34 read_options: Arc<SstableIteratorReadOptions>,
35 sstable_info_ref: &SstableInfo,
36 ) -> Self;
37}
38
39pub struct SstableIterator {
41 block_iter: Option<BlockIterator>,
43
44 cur_idx: usize,
46
47 preload_stream: Option<Box<dyn BlockStream>>,
48 pub sst: TableHolder,
50 preload_end_block_idx: usize,
51 preload_retry_times: usize,
52
53 sstable_store: SstableStoreRef,
54 stats: StoreLocalStatistic,
55 options: Arc<SstableIteratorReadOptions>,
56
57 block_start_idx_inclusive: usize,
59 block_end_idx_inclusive: usize,
60}
61
62impl SstableIterator {
63 pub fn new(
64 sstable: TableHolder,
65 sstable_store: SstableStoreRef,
66 options: Arc<SstableIteratorReadOptions>,
67 sstable_info_ref: &SstableInfo,
68 ) -> Self {
69 let mut block_start_idx_inclusive = 0;
70 let mut block_end_idx_inclusive = sstable.meta.block_metas.len() - 1;
71 let read_table_id_range = (
72 *sstable_info_ref.table_ids.first().unwrap(),
73 *sstable_info_ref.table_ids.last().unwrap(),
74 );
75 assert!(
76 read_table_id_range.0 <= read_table_id_range.1,
77 "invalid table id range {} - {}",
78 read_table_id_range.0,
79 read_table_id_range.1
80 );
81 let block_meta_count = sstable.meta.block_metas.len();
82 assert!(block_meta_count > 0);
83 assert!(
84 sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
85 "table id {} not found table_ids in block_meta {:?}",
86 read_table_id_range.0,
87 sstable
88 .meta
89 .block_metas
90 .iter()
91 .map(|meta| meta.table_id())
92 .collect::<Vec<_>>()
93 );
94 assert!(
95 sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
96 "table id {} not found table_ids in block_meta {:?}",
97 read_table_id_range.1,
98 sstable
99 .meta
100 .block_metas
101 .iter()
102 .map(|meta| meta.table_id())
103 .collect::<Vec<_>>()
104 );
105
106 while block_start_idx_inclusive < block_meta_count
107 && sstable.meta.block_metas[block_start_idx_inclusive].table_id()
108 < read_table_id_range.0
109 {
110 block_start_idx_inclusive += 1;
111 }
112 assert!(
114 block_start_idx_inclusive < block_meta_count,
115 "table id {} not found table_ids in block_meta {:?}",
116 read_table_id_range.0,
117 sstable
118 .meta
119 .block_metas
120 .iter()
121 .map(|meta| meta.table_id())
122 .collect::<Vec<_>>()
123 );
124
125 while block_end_idx_inclusive > block_start_idx_inclusive
126 && sstable.meta.block_metas[block_end_idx_inclusive].table_id() > read_table_id_range.1
127 {
128 block_end_idx_inclusive -= 1;
129 }
130 assert!(
131 block_end_idx_inclusive >= block_start_idx_inclusive,
132 "block_end_idx_inclusive {} < block_start_idx_inclusive {} block_meta_count {}",
133 block_end_idx_inclusive,
134 block_start_idx_inclusive,
135 block_meta_count
136 );
137
138 Self {
139 block_iter: None,
140 cur_idx: 0,
141 preload_stream: None,
142 sst: sstable,
143 sstable_store,
144 stats: StoreLocalStatistic::default(),
145 options,
146 preload_end_block_idx: 0,
147 preload_retry_times: 0,
148 block_start_idx_inclusive,
149 block_end_idx_inclusive,
150 }
151 }
152
153 fn init_block_prefetch_range(&mut self, start_idx: usize) {
154 assert!(
155 start_idx >= self.block_start_idx_inclusive
156 && start_idx <= self.block_end_idx_inclusive
157 );
158
159 self.preload_end_block_idx = 0;
160 if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() {
161 let block_metas = &self.sst.meta.block_metas
162 [self.block_start_idx_inclusive..=self.block_end_idx_inclusive];
163 let next_to_start_idx = start_idx + 1;
164 if next_to_start_idx <= self.block_end_idx_inclusive {
165 let end_idx = match bound {
166 Unbounded => self.block_end_idx_inclusive + 1,
167 Included(dest_key) => {
168 let dest_key = dest_key.as_ref();
169 self.block_start_idx_inclusive
170 + block_metas.partition_point(|block_meta| {
171 FullKey::decode(&block_meta.smallest_key).user_key <= dest_key
172 })
173 }
174 Excluded(end_key) => {
175 let end_key = end_key.as_ref();
176 self.block_start_idx_inclusive
177 + block_metas.partition_point(|block_meta| {
178 FullKey::decode(&block_meta.smallest_key).user_key < end_key
179 })
180 }
181 };
182
183 if next_to_start_idx < end_idx {
185 assert!(
186 end_idx <= self.block_end_idx_inclusive + 1,
187 "end_idx {} > block_end_idx_inclusive {} next_to_start_idx {}",
188 end_idx,
189 self.block_end_idx_inclusive,
190 next_to_start_idx
191 );
192 self.preload_end_block_idx = end_idx;
193 }
194 }
195 }
196 }
197
198 async fn seek_idx(
200 &mut self,
201 idx: usize,
202 seek_key: Option<FullKey<&[u8]>>,
203 ) -> HummockResult<()> {
204 tracing::debug!(
205 target: "events::storage::sstable::block_seek",
206 "table iterator seek: sstable_object_id = {}, block_id = {}",
207 self.sst.id,
208 idx,
209 );
210
211 tokio::task::consume_budget().await;
215
216 let mut hit_cache = false;
217 if idx > self.block_end_idx_inclusive {
218 self.block_iter = None;
219 return Ok(());
220 }
221 if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
223 match self
224 .sstable_store
225 .prefetch_blocks(
226 &self.sst,
227 idx,
228 self.preload_end_block_idx,
229 self.options.cache_policy,
230 &mut self.stats,
231 )
232 .instrument_await("prefetch_blocks".verbose())
233 .await
234 {
235 Ok(preload_stream) => self.preload_stream = Some(preload_stream),
236 Err(e) => {
237 tracing::warn!(error = %e.as_report(), "failed to create stream for prefetch data, fall back to block get")
238 }
239 }
240 }
241
242 if self
243 .preload_stream
244 .as_ref()
245 .map(|preload_stream| preload_stream.next_block_index() <= idx)
246 .unwrap_or(false)
247 {
248 while let Some(preload_stream) = self.preload_stream.as_mut() {
249 let mut ret = Ok(());
250 while preload_stream.next_block_index() < idx {
251 if let Err(e) = preload_stream.next_block().await {
252 ret = Err(e);
253 break;
254 }
255 }
256 assert_eq!(preload_stream.next_block_index(), idx);
257 if ret.is_ok() {
258 match preload_stream.next_block().await {
259 Ok(Some(block)) => {
260 hit_cache = true;
261 self.block_iter = Some(BlockIterator::new(block));
262 break;
263 }
264 Ok(None) => {
265 self.preload_stream.take();
266 }
267 Err(e) => {
268 self.preload_stream.take();
269 ret = Err(e);
270 }
271 }
272 } else {
273 self.preload_stream.take();
274 }
275 if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
276 if let Err(e) = ret {
277 tracing::warn!(error = %e.as_report(), "recreate stream because the connection to remote storage has closed");
278 if self.preload_retry_times >= self.options.max_preload_retry_times {
279 break;
280 }
281 self.preload_retry_times += 1;
282 }
283
284 match self
285 .sstable_store
286 .prefetch_blocks(
287 &self.sst,
288 idx,
289 self.preload_end_block_idx,
290 self.options.cache_policy,
291 &mut self.stats,
292 )
293 .instrument_await("prefetch_blocks".verbose())
294 .await
295 {
296 Ok(stream) => {
297 self.preload_stream = Some(stream);
298 }
299 Err(e) => {
300 tracing::warn!(error = %e.as_report(), "failed to recreate stream meet IO error");
301 break;
302 }
303 }
304 }
305 }
306 }
307 if !hit_cache {
308 let block = self
309 .sstable_store
310 .get(&self.sst, idx, self.options.cache_policy, &mut self.stats)
311 .await?;
312 self.block_iter = Some(BlockIterator::new(block));
313 };
314 let block_iter = self.block_iter.as_mut().unwrap();
315 if let Some(key) = seek_key {
316 block_iter.seek(key);
317 } else {
318 block_iter.seek_to_first();
319 }
320
321 self.cur_idx = idx;
322
323 Ok(())
324 }
325
326 fn calculate_block_idx_by_key(&self, key: FullKey<&[u8]>) -> usize {
327 self.block_start_idx_inclusive
328 + self.sst.meta.block_metas
329 [self.block_start_idx_inclusive..=self.block_end_idx_inclusive]
330 .partition_point(|block_meta| {
331 FullKey::decode(&block_meta.smallest_key).le(&key)
335 })
336 .saturating_sub(1) }
338}
339
340impl HummockIterator for SstableIterator {
341 type Direction = Forward;
342
343 async fn next(&mut self) -> HummockResult<()> {
344 self.stats.total_key_count += 1;
345 let block_iter = self.block_iter.as_mut().expect("no block iter");
346 if !block_iter.try_next() {
347 self.seek_idx(self.cur_idx + 1, None).await?;
349 }
350
351 Ok(())
352 }
353
354 fn key(&self) -> FullKey<&[u8]> {
355 self.block_iter.as_ref().expect("no block iter").key()
356 }
357
358 fn value(&self) -> HummockValue<&[u8]> {
359 let raw_value = self.block_iter.as_ref().expect("no block iter").value();
360
361 HummockValue::from_slice(raw_value).expect("decode error")
362 }
363
364 fn is_valid(&self) -> bool {
365 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
366 }
367
368 async fn rewind(&mut self) -> HummockResult<()> {
369 self.init_block_prefetch_range(self.block_start_idx_inclusive);
370 self.seek_idx(self.block_start_idx_inclusive, None).await?;
372 Ok(())
373 }
374
375 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
376 let block_idx = self.calculate_block_idx_by_key(key);
377 self.init_block_prefetch_range(block_idx);
378
379 self.seek_idx(block_idx, Some(key)).await?;
380 if !self.is_valid() {
381 self.seek_idx(block_idx + 1, None).await?;
383 }
384 Ok(())
385 }
386
387 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
388 stats.add(&self.stats);
389 }
390
391 fn value_meta(&self) -> ValueMeta {
392 ValueMeta {
393 object_id: Some(self.sst.id),
394 block_id: Some(self.cur_idx as _),
395 }
396 }
397}
398
399impl SstableIteratorType for SstableIterator {
400 fn create(
401 sstable: TableHolder,
402 sstable_store: SstableStoreRef,
403 options: Arc<SstableIteratorReadOptions>,
404 sstable_info_ref: &SstableInfo,
405 ) -> Self {
406 SstableIterator::new(sstable, sstable_store, options, sstable_info_ref)
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use std::collections::Bound;
413
414 use bytes::Bytes;
415 use foyer::Hint;
416 use itertools::Itertools;
417 use rand::prelude::*;
418 use rand::rng as thread_rng;
419 use risingwave_common::catalog::TableId;
420 use risingwave_common::hash::VirtualNode;
421 use risingwave_common::util::epoch::test_epoch;
422 use risingwave_hummock_sdk::EpochWithGap;
423 use risingwave_hummock_sdk::key::{TableKey, UserKey};
424 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
425
426 use super::*;
427 use crate::assert_bytes_eq;
428 use crate::hummock::CachePolicy;
429 use crate::hummock::iterator::test_utils::mock_sstable_store;
430 use crate::hummock::test_utils::{
431 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
432 gen_test_sstable_info, gen_test_sstable_with_table_ids, test_key_of, test_value_of,
433 };
434
435 async fn inner_test_forward_iterator(
436 sstable_store: SstableStoreRef,
437 handle: TableHolder,
438 sstable_info: SstableInfo,
439 ) {
440 let mut sstable_iter = SstableIterator::create(
443 handle,
444 sstable_store,
445 Arc::new(SstableIteratorReadOptions::default()),
446 &sstable_info,
447 );
448 let mut cnt = 0;
449 sstable_iter.rewind().await.unwrap();
450
451 while sstable_iter.is_valid() {
452 let key = sstable_iter.key();
453 let value = sstable_iter.value();
454 assert_eq!(key, test_key_of(cnt).to_ref());
455 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
456 cnt += 1;
457 sstable_iter.next().await.unwrap();
458 }
459
460 assert_eq!(cnt, TEST_KEYS_COUNT);
461 }
462
463 #[tokio::test]
464 async fn test_table_iterator() {
465 let sstable_store = mock_sstable_store().await;
467 let (sstable, sstable_info) =
468 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
469 .await;
470 assert!(sstable.meta.block_metas.len() > 10);
473
474 inner_test_forward_iterator(sstable_store.clone(), sstable, sstable_info).await;
475 }
476
477 #[tokio::test]
478 async fn test_table_seek() {
479 let sstable_store = mock_sstable_store().await;
480 let (sstable, sstable_info) =
481 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
482 .await;
483 assert!(sstable.meta.block_metas.len() > 10);
486 let mut sstable_iter = SstableIterator::create(
487 sstable,
488 sstable_store,
489 Arc::new(SstableIteratorReadOptions::default()),
490 &sstable_info,
491 );
492 let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
493 let mut rng = thread_rng();
494 all_key_to_test.shuffle(&mut rng);
495
496 for i in all_key_to_test {
498 sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
499 let key = sstable_iter.key();
501 assert_eq!(key, test_key_of(i).to_ref());
502 }
503
504 sstable_iter.seek(test_key_of(500).to_ref()).await.unwrap();
506 for i in 500..TEST_KEYS_COUNT {
507 let key = sstable_iter.key();
508 assert_eq!(key, test_key_of(i).to_ref());
509 sstable_iter.next().await.unwrap();
510 }
511 assert!(!sstable_iter.is_valid());
512
513 let smallest_key = FullKey::for_test(
515 TableId::default(),
516 [
517 VirtualNode::ZERO.to_be_bytes().as_slice(),
518 format!("key_aaaa_{:05}", 0).as_bytes(),
519 ]
520 .concat(),
521 test_epoch(233),
522 );
523 sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
524 let key = sstable_iter.key();
525 assert_eq!(key, test_key_of(0).to_ref());
526
527 let largest_key = FullKey::for_test(
529 TableId::default(),
530 [
531 VirtualNode::ZERO.to_be_bytes().as_slice(),
532 format!("key_zzzz_{:05}", 0).as_bytes(),
533 ]
534 .concat(),
535 test_epoch(233),
536 );
537 sstable_iter.seek(largest_key.to_ref()).await.unwrap();
538 assert!(!sstable_iter.is_valid());
539
540 for idx in 1..TEST_KEYS_COUNT {
542 sstable_iter
547 .seek(
548 FullKey::for_test(
549 TableId::default(),
550 [
551 VirtualNode::ZERO.to_be_bytes().as_slice(),
552 format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
553 ]
554 .concat(),
555 0,
556 )
557 .to_ref(),
558 )
559 .await
560 .unwrap();
561
562 let key = sstable_iter.key();
563 assert_eq!(key, test_key_of(idx).to_ref());
564 sstable_iter.next().await.unwrap();
565 }
566 assert!(!sstable_iter.is_valid());
567 }
568
569 #[tokio::test]
570 async fn test_prefetch_table_read() {
571 let sstable_store = mock_sstable_store().await;
572 let kv_iter =
574 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i))));
575 let sst_info = gen_test_sstable_info(
576 default_builder_opt_for_test(),
577 0,
578 kv_iter,
579 sstable_store.clone(),
580 )
581 .await;
582
583 let end_key = test_key_of(TEST_KEYS_COUNT);
584 let uk = UserKey::new(
585 end_key.user_key.table_id,
586 TableKey(Bytes::from(end_key.user_key.table_key.0)),
587 );
588 let options = Arc::new(SstableIteratorReadOptions {
589 cache_policy: CachePolicy::Fill(Hint::Normal),
590 must_iterated_end_user_key: Some(Bound::Included(uk.clone())),
591 max_preload_retry_times: 0,
592 prefetch_for_large_query: false,
593 });
594 let mut stats = StoreLocalStatistic::default();
595 let mut sstable_iter = SstableIterator::create(
596 sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
597 sstable_store.clone(),
598 options.clone(),
599 &sst_info,
600 );
601 let mut cnt = 1000;
602 sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
603 while sstable_iter.is_valid() {
604 let key = sstable_iter.key();
605 let value = sstable_iter.value();
606 assert_eq!(
607 key,
608 test_key_of(cnt).to_ref(),
609 "fail at {}, get key :{:?}",
610 cnt,
611 String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap()
612 );
613 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
614 cnt += 1;
615 sstable_iter.next().await.unwrap();
616 }
617 assert_eq!(cnt, TEST_KEYS_COUNT);
618 let mut sstable_iter = SstableIterator::create(
619 sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
620 sstable_store,
621 options.clone(),
622 &sst_info,
623 );
624 let mut cnt = 1000;
625 sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
626 while sstable_iter.is_valid() {
627 let key = sstable_iter.key();
628 let value = sstable_iter.value();
629 assert_eq!(key, test_key_of(cnt).to_ref());
630 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
631 cnt += 1;
632 sstable_iter.next().await.unwrap();
633 }
634 assert_eq!(cnt, TEST_KEYS_COUNT);
635 }
636
637 #[tokio::test]
638 async fn test_read_table_id_range() {
639 {
640 let sstable_store = mock_sstable_store().await;
641 let (sstable, sstable_info) =
642 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
643 .await;
644 let mut sstable_iter = SstableIterator::create(
645 sstable,
646 sstable_store.clone(),
647 Arc::new(SstableIteratorReadOptions::default()),
648 &sstable_info,
649 );
650 sstable_iter.rewind().await.unwrap();
651 assert!(sstable_iter.is_valid());
652 assert_eq!(sstable_iter.key(), test_key_of(0).to_ref());
653 }
654
655 {
656 let sstable_store = mock_sstable_store().await;
657 let k1 = {
659 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
660 table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
661 let uk = UserKey::for_test(TableId::from(1), table_key);
662 FullKey {
663 user_key: uk,
664 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
665 }
666 };
667
668 let k2 = {
669 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
670 table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
671 let uk = UserKey::for_test(TableId::from(2), table_key);
672 FullKey {
673 user_key: uk,
674 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
675 }
676 };
677
678 let k3 = {
679 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
680 table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
681 let uk = UserKey::for_test(TableId::from(3), table_key);
682 FullKey {
683 user_key: uk,
684 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
685 }
686 };
687
688 {
689 let kv_pairs = vec![
690 (k1.clone(), HummockValue::put(test_value_of(1))),
691 (k2.clone(), HummockValue::put(test_value_of(2))),
692 (k3.clone(), HummockValue::put(test_value_of(3))),
693 ];
694
695 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
696 default_builder_opt_for_test(),
697 10,
698 kv_pairs.into_iter(),
699 sstable_store.clone(),
700 vec![1, 2, 3],
701 )
702 .await;
703 let mut sstable_iter = SstableIterator::create(
704 sstable,
705 sstable_store.clone(),
706 Arc::new(SstableIteratorReadOptions::default()),
707 &SstableInfo::from(SstableInfoInner {
708 table_ids: vec![1.into(), 2.into(), 3.into()],
709 ..Default::default()
710 }),
711 );
712 sstable_iter.rewind().await.unwrap();
713 assert!(sstable_iter.is_valid());
714 assert!(sstable_iter.key().eq(&k1.to_ref()));
715
716 let mut cnt = 0;
717 let mut last_key = k1.clone();
718 while sstable_iter.is_valid() {
719 last_key = sstable_iter.key().to_vec();
720 cnt += 1;
721 sstable_iter.next().await.unwrap();
722 }
723
724 assert_eq!(3, cnt);
725 assert_eq!(last_key, k3.clone());
726 }
727
728 {
729 let kv_pairs = vec![
730 (k1.clone(), HummockValue::put(test_value_of(1))),
731 (k2.clone(), HummockValue::put(test_value_of(2))),
732 (k3.clone(), HummockValue::put(test_value_of(3))),
733 ];
734
735 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
736 default_builder_opt_for_test(),
737 10,
738 kv_pairs.into_iter(),
739 sstable_store.clone(),
740 vec![1, 2, 3],
741 )
742 .await;
743
744 let mut sstable_iter = SstableIterator::create(
745 sstable,
746 sstable_store.clone(),
747 Arc::new(SstableIteratorReadOptions::default()),
748 &SstableInfo::from(SstableInfoInner {
749 table_ids: vec![1.into(), 2.into()],
750 ..Default::default()
751 }),
752 );
753 sstable_iter.rewind().await.unwrap();
754 assert!(sstable_iter.is_valid());
755 assert!(sstable_iter.key().eq(&k1.to_ref()));
756
757 let mut cnt = 0;
758 let mut last_key = k1.clone();
759 while sstable_iter.is_valid() {
760 last_key = sstable_iter.key().to_vec();
761 cnt += 1;
762 sstable_iter.next().await.unwrap();
763 }
764
765 assert_eq!(2, cnt);
766 assert_eq!(last_key, k2.clone());
767 }
768
769 {
770 let kv_pairs = vec![
771 (k1.clone(), HummockValue::put(test_value_of(1))),
772 (k2.clone(), HummockValue::put(test_value_of(2))),
773 (k3.clone(), HummockValue::put(test_value_of(3))),
774 ];
775
776 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
777 default_builder_opt_for_test(),
778 10,
779 kv_pairs.into_iter(),
780 sstable_store.clone(),
781 vec![1, 2, 3],
782 )
783 .await;
784
785 let mut sstable_iter = SstableIterator::create(
786 sstable,
787 sstable_store.clone(),
788 Arc::new(SstableIteratorReadOptions::default()),
789 &SstableInfo::from(SstableInfoInner {
790 table_ids: vec![2.into(), 3.into()],
791 ..Default::default()
792 }),
793 );
794 sstable_iter.rewind().await.unwrap();
795 assert!(sstable_iter.is_valid());
796 assert!(sstable_iter.key().eq(&k2.to_ref()));
797
798 let mut cnt = 0;
799 let mut last_key = k1.clone();
800 while sstable_iter.is_valid() {
801 last_key = sstable_iter.key().to_vec();
802 cnt += 1;
803 sstable_iter.next().await.unwrap();
804 }
805
806 assert_eq!(2, cnt);
807 assert_eq!(last_key, k3.clone());
808 }
809
810 {
811 let kv_pairs = vec![
812 (k1.clone(), HummockValue::put(test_value_of(1))),
813 (k2.clone(), HummockValue::put(test_value_of(2))),
814 (k3.clone(), HummockValue::put(test_value_of(3))),
815 ];
816
817 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
818 default_builder_opt_for_test(),
819 10,
820 kv_pairs.into_iter(),
821 sstable_store.clone(),
822 vec![1, 2, 3],
823 )
824 .await;
825
826 let mut sstable_iter = SstableIterator::create(
827 sstable,
828 sstable_store.clone(),
829 Arc::new(SstableIteratorReadOptions::default()),
830 &SstableInfo::from(SstableInfoInner {
831 table_ids: vec![2.into()],
832 ..Default::default()
833 }),
834 );
835 sstable_iter.rewind().await.unwrap();
836 assert!(sstable_iter.is_valid());
837 assert!(sstable_iter.key().eq(&k2.to_ref()));
838
839 let mut cnt = 0;
840 let mut last_key = k1.clone();
841 while sstable_iter.is_valid() {
842 last_key = sstable_iter.key().to_vec();
843 cnt += 1;
844 sstable_iter.next().await.unwrap();
845 }
846
847 assert_eq!(1, cnt);
848 assert_eq!(last_key, k2.clone());
849 }
850 }
851 }
852}