1use std::ops::Bound::*;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use thiserror_ext::AsReport;
22
23use super::super::{HummockResult, HummockValue};
24use crate::hummock::block_stream::BlockStream;
25use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
26use crate::hummock::sstable::SstableIteratorReadOptions;
27use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder};
28use crate::monitor::StoreLocalStatistic;
29
30pub trait SstableIteratorType: HummockIterator + 'static {
31 fn create(
32 sstable: TableHolder,
33 sstable_store: SstableStoreRef,
34 read_options: Arc<SstableIteratorReadOptions>,
35 sstable_info_ref: &SstableInfo,
36 ) -> Self;
37}
38
39pub struct SstableIterator {
41 block_iter: Option<BlockIterator>,
43
44 cur_idx: usize,
46
47 preload_stream: Option<Box<dyn BlockStream>>,
48 pub sst: TableHolder,
50 preload_end_block_idx: usize,
51 preload_retry_times: usize,
52
53 sstable_store: SstableStoreRef,
54 stats: StoreLocalStatistic,
55 options: Arc<SstableIteratorReadOptions>,
56
57 block_start_idx_inclusive: usize,
59 block_end_idx_inclusive: usize,
60}
61
62impl SstableIterator {
63 pub fn new(
64 sstable: TableHolder,
65 sstable_store: SstableStoreRef,
66 options: Arc<SstableIteratorReadOptions>,
67 sstable_info_ref: &SstableInfo,
68 ) -> Self {
69 let mut block_start_idx_inclusive = 0;
70 let mut block_end_idx_inclusive = sstable.meta.block_metas.len() - 1;
71 let read_table_id_range = (
72 *sstable_info_ref.table_ids.first().unwrap(),
73 *sstable_info_ref.table_ids.last().unwrap(),
74 );
75 assert!(
76 read_table_id_range.0 <= read_table_id_range.1,
77 "invalid table id range {} - {}",
78 read_table_id_range.0,
79 read_table_id_range.1
80 );
81 let block_meta_count = sstable.meta.block_metas.len();
82 assert!(block_meta_count > 0);
83 assert!(
84 sstable.meta.block_metas[0].table_id().table_id() <= read_table_id_range.0,
85 "table id {} not found table_ids in block_meta {:?}",
86 read_table_id_range.0,
87 sstable
88 .meta
89 .block_metas
90 .iter()
91 .map(|meta| meta.table_id())
92 .collect::<Vec<_>>()
93 );
94 assert!(
95 sstable.meta.block_metas[block_meta_count - 1]
96 .table_id()
97 .table_id()
98 >= read_table_id_range.1,
99 "table id {} not found table_ids in block_meta {:?}",
100 read_table_id_range.1,
101 sstable
102 .meta
103 .block_metas
104 .iter()
105 .map(|meta| meta.table_id())
106 .collect::<Vec<_>>()
107 );
108
109 while block_start_idx_inclusive < block_meta_count
110 && sstable.meta.block_metas[block_start_idx_inclusive]
111 .table_id()
112 .table_id()
113 < read_table_id_range.0
114 {
115 block_start_idx_inclusive += 1;
116 }
117 assert!(
119 block_start_idx_inclusive < block_meta_count,
120 "table id {} not found table_ids in block_meta {:?}",
121 read_table_id_range.0,
122 sstable
123 .meta
124 .block_metas
125 .iter()
126 .map(|meta| meta.table_id())
127 .collect::<Vec<_>>()
128 );
129
130 while block_end_idx_inclusive > block_start_idx_inclusive
131 && sstable.meta.block_metas[block_end_idx_inclusive]
132 .table_id()
133 .table_id()
134 > read_table_id_range.1
135 {
136 block_end_idx_inclusive -= 1;
137 }
138 assert!(
139 block_end_idx_inclusive >= block_start_idx_inclusive,
140 "block_end_idx_inclusive {} < block_start_idx_inclusive {} block_meta_count {}",
141 block_end_idx_inclusive,
142 block_start_idx_inclusive,
143 block_meta_count
144 );
145
146 Self {
147 block_iter: None,
148 cur_idx: 0,
149 preload_stream: None,
150 sst: sstable,
151 sstable_store,
152 stats: StoreLocalStatistic::default(),
153 options,
154 preload_end_block_idx: 0,
155 preload_retry_times: 0,
156 block_start_idx_inclusive,
157 block_end_idx_inclusive,
158 }
159 }
160
161 fn init_block_prefetch_range(&mut self, start_idx: usize) {
162 assert!(
163 start_idx >= self.block_start_idx_inclusive
164 && start_idx <= self.block_end_idx_inclusive
165 );
166
167 self.preload_end_block_idx = 0;
168 if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() {
169 let block_metas = &self.sst.meta.block_metas
170 [self.block_start_idx_inclusive..=self.block_end_idx_inclusive];
171 let next_to_start_idx = start_idx + 1;
172 if next_to_start_idx <= self.block_end_idx_inclusive {
173 let end_idx = match bound {
174 Unbounded => self.block_end_idx_inclusive + 1,
175 Included(dest_key) => {
176 let dest_key = dest_key.as_ref();
177 self.block_start_idx_inclusive
178 + block_metas.partition_point(|block_meta| {
179 FullKey::decode(&block_meta.smallest_key).user_key <= dest_key
180 })
181 }
182 Excluded(end_key) => {
183 let end_key = end_key.as_ref();
184 self.block_start_idx_inclusive
185 + block_metas.partition_point(|block_meta| {
186 FullKey::decode(&block_meta.smallest_key).user_key < end_key
187 })
188 }
189 };
190
191 if next_to_start_idx < end_idx {
193 assert!(
194 end_idx <= self.block_end_idx_inclusive + 1,
195 "end_idx {} > block_end_idx_inclusive {} next_to_start_idx {}",
196 end_idx,
197 self.block_end_idx_inclusive,
198 next_to_start_idx
199 );
200 self.preload_end_block_idx = end_idx;
201 }
202 }
203 }
204 }
205
206 async fn seek_idx(
208 &mut self,
209 idx: usize,
210 seek_key: Option<FullKey<&[u8]>>,
211 ) -> HummockResult<()> {
212 tracing::debug!(
213 target: "events::storage::sstable::block_seek",
214 "table iterator seek: sstable_object_id = {}, block_id = {}",
215 self.sst.id,
216 idx,
217 );
218
219 tokio::task::consume_budget().await;
223
224 let mut hit_cache = false;
225 if idx > self.block_end_idx_inclusive {
226 self.block_iter = None;
227 return Ok(());
228 }
229 if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
231 match self
232 .sstable_store
233 .prefetch_blocks(
234 &self.sst,
235 idx,
236 self.preload_end_block_idx,
237 self.options.cache_policy,
238 &mut self.stats,
239 )
240 .instrument_await("prefetch_blocks".verbose())
241 .await
242 {
243 Ok(preload_stream) => self.preload_stream = Some(preload_stream),
244 Err(e) => {
245 tracing::warn!(error = %e.as_report(), "failed to create stream for prefetch data, fall back to block get")
246 }
247 }
248 }
249
250 if self
251 .preload_stream
252 .as_ref()
253 .map(|preload_stream| preload_stream.next_block_index() <= idx)
254 .unwrap_or(false)
255 {
256 while let Some(preload_stream) = self.preload_stream.as_mut() {
257 let mut ret = Ok(());
258 while preload_stream.next_block_index() < idx {
259 if let Err(e) = preload_stream.next_block().await {
260 ret = Err(e);
261 break;
262 }
263 }
264 assert_eq!(preload_stream.next_block_index(), idx);
265 if ret.is_ok() {
266 match preload_stream.next_block().await {
267 Ok(Some(block)) => {
268 hit_cache = true;
269 self.block_iter = Some(BlockIterator::new(block));
270 break;
271 }
272 Ok(None) => {
273 self.preload_stream.take();
274 }
275 Err(e) => {
276 self.preload_stream.take();
277 ret = Err(e);
278 }
279 }
280 } else {
281 self.preload_stream.take();
282 }
283 if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
284 if let Err(e) = ret {
285 tracing::warn!(error = %e.as_report(), "recreate stream because the connection to remote storage has closed");
286 if self.preload_retry_times >= self.options.max_preload_retry_times {
287 break;
288 }
289 self.preload_retry_times += 1;
290 }
291
292 match self
293 .sstable_store
294 .prefetch_blocks(
295 &self.sst,
296 idx,
297 self.preload_end_block_idx,
298 self.options.cache_policy,
299 &mut self.stats,
300 )
301 .instrument_await("prefetch_blocks".verbose())
302 .await
303 {
304 Ok(stream) => {
305 self.preload_stream = Some(stream);
306 }
307 Err(e) => {
308 tracing::warn!(error = %e.as_report(), "failed to recreate stream meet IO error");
309 break;
310 }
311 }
312 }
313 }
314 }
315 if !hit_cache {
316 let block = self
317 .sstable_store
318 .get(&self.sst, idx, self.options.cache_policy, &mut self.stats)
319 .await?;
320 self.block_iter = Some(BlockIterator::new(block));
321 };
322 let block_iter = self.block_iter.as_mut().unwrap();
323 if let Some(key) = seek_key {
324 block_iter.seek(key);
325 } else {
326 block_iter.seek_to_first();
327 }
328
329 self.cur_idx = idx;
330
331 Ok(())
332 }
333
334 fn calculate_block_idx_by_key(&self, key: FullKey<&[u8]>) -> usize {
335 self.block_start_idx_inclusive
336 + self.sst.meta.block_metas
337 [self.block_start_idx_inclusive..=self.block_end_idx_inclusive]
338 .partition_point(|block_meta| {
339 FullKey::decode(&block_meta.smallest_key).le(&key)
343 })
344 .saturating_sub(1) }
346}
347
348impl HummockIterator for SstableIterator {
349 type Direction = Forward;
350
351 async fn next(&mut self) -> HummockResult<()> {
352 self.stats.total_key_count += 1;
353 let block_iter = self.block_iter.as_mut().expect("no block iter");
354 if !block_iter.try_next() {
355 self.seek_idx(self.cur_idx + 1, None).await?;
357 }
358
359 Ok(())
360 }
361
362 fn key(&self) -> FullKey<&[u8]> {
363 self.block_iter.as_ref().expect("no block iter").key()
364 }
365
366 fn value(&self) -> HummockValue<&[u8]> {
367 let raw_value = self.block_iter.as_ref().expect("no block iter").value();
368
369 HummockValue::from_slice(raw_value).expect("decode error")
370 }
371
372 fn is_valid(&self) -> bool {
373 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
374 }
375
376 async fn rewind(&mut self) -> HummockResult<()> {
377 self.init_block_prefetch_range(self.block_start_idx_inclusive);
378 self.seek_idx(self.block_start_idx_inclusive, None).await?;
380 Ok(())
381 }
382
383 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
384 let block_idx = self.calculate_block_idx_by_key(key);
385 self.init_block_prefetch_range(block_idx);
386
387 self.seek_idx(block_idx, Some(key)).await?;
388 if !self.is_valid() {
389 self.seek_idx(block_idx + 1, None).await?;
391 }
392 Ok(())
393 }
394
395 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
396 stats.add(&self.stats);
397 }
398
399 fn value_meta(&self) -> ValueMeta {
400 ValueMeta {
401 object_id: Some(self.sst.id),
402 block_id: Some(self.cur_idx as _),
403 }
404 }
405}
406
407impl SstableIteratorType for SstableIterator {
408 fn create(
409 sstable: TableHolder,
410 sstable_store: SstableStoreRef,
411 options: Arc<SstableIteratorReadOptions>,
412 sstable_info_ref: &SstableInfo,
413 ) -> Self {
414 SstableIterator::new(sstable, sstable_store, options, sstable_info_ref)
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use std::collections::Bound;
421
422 use bytes::Bytes;
423 use foyer::CacheHint;
424 use itertools::Itertools;
425 use rand::prelude::*;
426 use rand::rng as thread_rng;
427 use risingwave_common::catalog::TableId;
428 use risingwave_common::hash::VirtualNode;
429 use risingwave_common::util::epoch::test_epoch;
430 use risingwave_hummock_sdk::EpochWithGap;
431 use risingwave_hummock_sdk::key::{TableKey, UserKey};
432 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
433
434 use super::*;
435 use crate::assert_bytes_eq;
436 use crate::hummock::CachePolicy;
437 use crate::hummock::iterator::test_utils::mock_sstable_store;
438 use crate::hummock::test_utils::{
439 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
440 gen_test_sstable_info, gen_test_sstable_with_table_ids, test_key_of, test_value_of,
441 };
442
443 async fn inner_test_forward_iterator(
444 sstable_store: SstableStoreRef,
445 handle: TableHolder,
446 sstable_info: SstableInfo,
447 ) {
448 let mut sstable_iter = SstableIterator::create(
451 handle,
452 sstable_store,
453 Arc::new(SstableIteratorReadOptions::default()),
454 &sstable_info,
455 );
456 let mut cnt = 0;
457 sstable_iter.rewind().await.unwrap();
458
459 while sstable_iter.is_valid() {
460 let key = sstable_iter.key();
461 let value = sstable_iter.value();
462 assert_eq!(key, test_key_of(cnt).to_ref());
463 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
464 cnt += 1;
465 sstable_iter.next().await.unwrap();
466 }
467
468 assert_eq!(cnt, TEST_KEYS_COUNT);
469 }
470
471 #[tokio::test]
472 async fn test_table_iterator() {
473 let sstable_store = mock_sstable_store().await;
475 let (sstable, sstable_info) =
476 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
477 .await;
478 assert!(sstable.meta.block_metas.len() > 10);
481
482 inner_test_forward_iterator(sstable_store.clone(), sstable, sstable_info).await;
483 }
484
485 #[tokio::test]
486 async fn test_table_seek() {
487 let sstable_store = mock_sstable_store().await;
488 let (sstable, sstable_info) =
489 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
490 .await;
491 assert!(sstable.meta.block_metas.len() > 10);
494 let mut sstable_iter = SstableIterator::create(
495 sstable,
496 sstable_store,
497 Arc::new(SstableIteratorReadOptions::default()),
498 &sstable_info,
499 );
500 let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
501 let mut rng = thread_rng();
502 all_key_to_test.shuffle(&mut rng);
503
504 for i in all_key_to_test {
506 sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
507 let key = sstable_iter.key();
509 assert_eq!(key, test_key_of(i).to_ref());
510 }
511
512 sstable_iter.seek(test_key_of(500).to_ref()).await.unwrap();
514 for i in 500..TEST_KEYS_COUNT {
515 let key = sstable_iter.key();
516 assert_eq!(key, test_key_of(i).to_ref());
517 sstable_iter.next().await.unwrap();
518 }
519 assert!(!sstable_iter.is_valid());
520
521 let smallest_key = FullKey::for_test(
523 TableId::default(),
524 [
525 VirtualNode::ZERO.to_be_bytes().as_slice(),
526 format!("key_aaaa_{:05}", 0).as_bytes(),
527 ]
528 .concat(),
529 test_epoch(233),
530 );
531 sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
532 let key = sstable_iter.key();
533 assert_eq!(key, test_key_of(0).to_ref());
534
535 let largest_key = FullKey::for_test(
537 TableId::default(),
538 [
539 VirtualNode::ZERO.to_be_bytes().as_slice(),
540 format!("key_zzzz_{:05}", 0).as_bytes(),
541 ]
542 .concat(),
543 test_epoch(233),
544 );
545 sstable_iter.seek(largest_key.to_ref()).await.unwrap();
546 assert!(!sstable_iter.is_valid());
547
548 for idx in 1..TEST_KEYS_COUNT {
550 sstable_iter
555 .seek(
556 FullKey::for_test(
557 TableId::default(),
558 [
559 VirtualNode::ZERO.to_be_bytes().as_slice(),
560 format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
561 ]
562 .concat(),
563 0,
564 )
565 .to_ref(),
566 )
567 .await
568 .unwrap();
569
570 let key = sstable_iter.key();
571 assert_eq!(key, test_key_of(idx).to_ref());
572 sstable_iter.next().await.unwrap();
573 }
574 assert!(!sstable_iter.is_valid());
575 }
576
577 #[tokio::test]
578 async fn test_prefetch_table_read() {
579 let sstable_store = mock_sstable_store().await;
580 let kv_iter =
582 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i))));
583 let sst_info = gen_test_sstable_info(
584 default_builder_opt_for_test(),
585 0,
586 kv_iter,
587 sstable_store.clone(),
588 )
589 .await;
590
591 let end_key = test_key_of(TEST_KEYS_COUNT);
592 let uk = UserKey::new(
593 end_key.user_key.table_id,
594 TableKey(Bytes::from(end_key.user_key.table_key.0)),
595 );
596 let options = Arc::new(SstableIteratorReadOptions {
597 cache_policy: CachePolicy::Fill(CacheHint::Normal),
598 must_iterated_end_user_key: Some(Bound::Included(uk.clone())),
599 max_preload_retry_times: 0,
600 prefetch_for_large_query: false,
601 });
602 let mut stats = StoreLocalStatistic::default();
603 let mut sstable_iter = SstableIterator::create(
604 sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
605 sstable_store.clone(),
606 options.clone(),
607 &sst_info,
608 );
609 let mut cnt = 1000;
610 sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
611 while sstable_iter.is_valid() {
612 let key = sstable_iter.key();
613 let value = sstable_iter.value();
614 assert_eq!(
615 key,
616 test_key_of(cnt).to_ref(),
617 "fail at {}, get key :{:?}",
618 cnt,
619 String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap()
620 );
621 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
622 cnt += 1;
623 sstable_iter.next().await.unwrap();
624 }
625 assert_eq!(cnt, TEST_KEYS_COUNT);
626 let mut sstable_iter = SstableIterator::create(
627 sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
628 sstable_store,
629 options.clone(),
630 &sst_info,
631 );
632 let mut cnt = 1000;
633 sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
634 while sstable_iter.is_valid() {
635 let key = sstable_iter.key();
636 let value = sstable_iter.value();
637 assert_eq!(key, test_key_of(cnt).to_ref());
638 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
639 cnt += 1;
640 sstable_iter.next().await.unwrap();
641 }
642 assert_eq!(cnt, TEST_KEYS_COUNT);
643 }
644
645 #[tokio::test]
646 async fn test_read_table_id_range() {
647 {
648 let sstable_store = mock_sstable_store().await;
649 let (sstable, sstable_info) =
650 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
651 .await;
652 let mut sstable_iter = SstableIterator::create(
653 sstable,
654 sstable_store.clone(),
655 Arc::new(SstableIteratorReadOptions::default()),
656 &sstable_info,
657 );
658 sstable_iter.rewind().await.unwrap();
659 assert!(sstable_iter.is_valid());
660 assert_eq!(sstable_iter.key(), test_key_of(0).to_ref());
661 }
662
663 {
664 let sstable_store = mock_sstable_store().await;
665 let k1 = {
667 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
668 table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
669 let uk = UserKey::for_test(TableId::from(1), table_key);
670 FullKey {
671 user_key: uk,
672 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
673 }
674 };
675
676 let k2 = {
677 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
678 table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
679 let uk = UserKey::for_test(TableId::from(2), table_key);
680 FullKey {
681 user_key: uk,
682 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
683 }
684 };
685
686 let k3 = {
687 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
688 table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
689 let uk = UserKey::for_test(TableId::from(3), table_key);
690 FullKey {
691 user_key: uk,
692 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
693 }
694 };
695
696 {
697 let kv_pairs = vec![
698 (k1.clone(), HummockValue::put(test_value_of(1))),
699 (k2.clone(), HummockValue::put(test_value_of(2))),
700 (k3.clone(), HummockValue::put(test_value_of(3))),
701 ];
702
703 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
704 default_builder_opt_for_test(),
705 10,
706 kv_pairs.into_iter(),
707 sstable_store.clone(),
708 vec![1, 2, 3],
709 )
710 .await;
711 let mut sstable_iter = SstableIterator::create(
712 sstable,
713 sstable_store.clone(),
714 Arc::new(SstableIteratorReadOptions::default()),
715 &SstableInfo::from(SstableInfoInner {
716 table_ids: vec![1, 2, 3],
717 ..Default::default()
718 }),
719 );
720 sstable_iter.rewind().await.unwrap();
721 assert!(sstable_iter.is_valid());
722 assert!(sstable_iter.key().eq(&k1.to_ref()));
723
724 let mut cnt = 0;
725 let mut last_key = k1.clone();
726 while sstable_iter.is_valid() {
727 last_key = sstable_iter.key().to_vec();
728 cnt += 1;
729 sstable_iter.next().await.unwrap();
730 }
731
732 assert_eq!(3, cnt);
733 assert_eq!(last_key, k3.clone());
734 }
735
736 {
737 let kv_pairs = vec![
738 (k1.clone(), HummockValue::put(test_value_of(1))),
739 (k2.clone(), HummockValue::put(test_value_of(2))),
740 (k3.clone(), HummockValue::put(test_value_of(3))),
741 ];
742
743 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
744 default_builder_opt_for_test(),
745 10,
746 kv_pairs.into_iter(),
747 sstable_store.clone(),
748 vec![1, 2, 3],
749 )
750 .await;
751
752 let mut sstable_iter = SstableIterator::create(
753 sstable,
754 sstable_store.clone(),
755 Arc::new(SstableIteratorReadOptions::default()),
756 &SstableInfo::from(SstableInfoInner {
757 table_ids: vec![1, 2],
758 ..Default::default()
759 }),
760 );
761 sstable_iter.rewind().await.unwrap();
762 assert!(sstable_iter.is_valid());
763 assert!(sstable_iter.key().eq(&k1.to_ref()));
764
765 let mut cnt = 0;
766 let mut last_key = k1.clone();
767 while sstable_iter.is_valid() {
768 last_key = sstable_iter.key().to_vec();
769 cnt += 1;
770 sstable_iter.next().await.unwrap();
771 }
772
773 assert_eq!(2, cnt);
774 assert_eq!(last_key, k2.clone());
775 }
776
777 {
778 let kv_pairs = vec![
779 (k1.clone(), HummockValue::put(test_value_of(1))),
780 (k2.clone(), HummockValue::put(test_value_of(2))),
781 (k3.clone(), HummockValue::put(test_value_of(3))),
782 ];
783
784 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
785 default_builder_opt_for_test(),
786 10,
787 kv_pairs.into_iter(),
788 sstable_store.clone(),
789 vec![1, 2, 3],
790 )
791 .await;
792
793 let mut sstable_iter = SstableIterator::create(
794 sstable,
795 sstable_store.clone(),
796 Arc::new(SstableIteratorReadOptions::default()),
797 &SstableInfo::from(SstableInfoInner {
798 table_ids: vec![2, 3],
799 ..Default::default()
800 }),
801 );
802 sstable_iter.rewind().await.unwrap();
803 assert!(sstable_iter.is_valid());
804 assert!(sstable_iter.key().eq(&k2.to_ref()));
805
806 let mut cnt = 0;
807 let mut last_key = k1.clone();
808 while sstable_iter.is_valid() {
809 last_key = sstable_iter.key().to_vec();
810 cnt += 1;
811 sstable_iter.next().await.unwrap();
812 }
813
814 assert_eq!(2, cnt);
815 assert_eq!(last_key, k3.clone());
816 }
817
818 {
819 let kv_pairs = vec![
820 (k1.clone(), HummockValue::put(test_value_of(1))),
821 (k2.clone(), HummockValue::put(test_value_of(2))),
822 (k3.clone(), HummockValue::put(test_value_of(3))),
823 ];
824
825 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
826 default_builder_opt_for_test(),
827 10,
828 kv_pairs.into_iter(),
829 sstable_store.clone(),
830 vec![1, 2, 3],
831 )
832 .await;
833
834 let mut sstable_iter = SstableIterator::create(
835 sstable,
836 sstable_store.clone(),
837 Arc::new(SstableIteratorReadOptions::default()),
838 &SstableInfo::from(SstableInfoInner {
839 table_ids: vec![2],
840 ..Default::default()
841 }),
842 );
843 sstable_iter.rewind().await.unwrap();
844 assert!(sstable_iter.is_valid());
845 assert!(sstable_iter.key().eq(&k2.to_ref()));
846
847 let mut cnt = 0;
848 let mut last_key = k1.clone();
849 while sstable_iter.is_valid() {
850 last_key = sstable_iter.key().to_vec();
851 cnt += 1;
852 sstable_iter.next().await.unwrap();
853 }
854
855 assert_eq!(1, cnt);
856 assert_eq!(last_key, k2.clone());
857 }
858 }
859 }
860}