1use std::ops::Bound::*;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use thiserror_ext::AsReport;
22
23use super::super::{HummockResult, HummockValue};
24use crate::hummock::block_stream::BlockStream;
25use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
26use crate::hummock::sstable::SstableIteratorReadOptions;
27use crate::hummock::{BlockIterator, SstableStoreRef, TableHolder};
28use crate::monitor::StoreLocalStatistic;
29
30pub trait SstableIteratorType: HummockIterator + 'static {
31 fn create(
32 sstable: TableHolder,
33 sstable_store: SstableStoreRef,
34 read_options: Arc<SstableIteratorReadOptions>,
35 sstable_info_ref: &SstableInfo,
36 ) -> Self;
37}
38
39pub struct SstableIterator {
41 block_iter: Option<BlockIterator>,
43
44 cur_idx: usize,
46
47 preload_stream: Option<Box<dyn BlockStream>>,
48 pub sst: TableHolder,
50 preload_end_block_idx: usize,
51 preload_retry_times: usize,
52
53 sstable_store: SstableStoreRef,
54 stats: StoreLocalStatistic,
55 options: Arc<SstableIteratorReadOptions>,
56
57 block_start_idx_inclusive: usize,
59 block_end_idx_inclusive: usize,
60}
61
62impl SstableIterator {
63 pub fn new(
64 sstable: TableHolder,
65 sstable_store: SstableStoreRef,
66 options: Arc<SstableIteratorReadOptions>,
67 sstable_info_ref: &SstableInfo,
68 ) -> Self {
69 let mut block_start_idx_inclusive = 0;
70 let mut block_end_idx_inclusive = sstable.meta.block_metas.len() - 1;
71 assert!(
72 !sstable_info_ref.table_ids.is_empty(),
73 "SstableIterator: SST {} (object {}) has empty table_ids",
74 sstable_info_ref.sst_id,
75 sstable_info_ref.object_id,
76 );
77 let read_table_id_range = (
78 *sstable_info_ref.table_ids.first().unwrap(),
79 *sstable_info_ref.table_ids.last().unwrap(),
80 );
81 assert!(
82 read_table_id_range.0 <= read_table_id_range.1,
83 "invalid table id range {} - {}",
84 read_table_id_range.0,
85 read_table_id_range.1
86 );
87 let block_meta_count = sstable.meta.block_metas.len();
88 assert!(block_meta_count > 0);
89 assert!(
90 sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
91 "table id {} not found table_ids in block_meta {:?}",
92 read_table_id_range.0,
93 sstable
94 .meta
95 .block_metas
96 .iter()
97 .map(|meta| meta.table_id())
98 .collect::<Vec<_>>()
99 );
100 assert!(
101 sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
102 "table id {} not found table_ids in block_meta {:?}",
103 read_table_id_range.1,
104 sstable
105 .meta
106 .block_metas
107 .iter()
108 .map(|meta| meta.table_id())
109 .collect::<Vec<_>>()
110 );
111
112 while block_start_idx_inclusive < block_meta_count
113 && sstable.meta.block_metas[block_start_idx_inclusive].table_id()
114 < read_table_id_range.0
115 {
116 block_start_idx_inclusive += 1;
117 }
118 assert!(
120 block_start_idx_inclusive < block_meta_count,
121 "table id {} not found table_ids in block_meta {:?}",
122 read_table_id_range.0,
123 sstable
124 .meta
125 .block_metas
126 .iter()
127 .map(|meta| meta.table_id())
128 .collect::<Vec<_>>()
129 );
130
131 while block_end_idx_inclusive > block_start_idx_inclusive
132 && sstable.meta.block_metas[block_end_idx_inclusive].table_id() > read_table_id_range.1
133 {
134 block_end_idx_inclusive -= 1;
135 }
136 assert!(
137 block_end_idx_inclusive >= block_start_idx_inclusive,
138 "block_end_idx_inclusive {} < block_start_idx_inclusive {} block_meta_count {}",
139 block_end_idx_inclusive,
140 block_start_idx_inclusive,
141 block_meta_count
142 );
143
144 Self {
145 block_iter: None,
146 cur_idx: 0,
147 preload_stream: None,
148 sst: sstable,
149 sstable_store,
150 stats: StoreLocalStatistic::default(),
151 options,
152 preload_end_block_idx: 0,
153 preload_retry_times: 0,
154 block_start_idx_inclusive,
155 block_end_idx_inclusive,
156 }
157 }
158
159 fn init_block_prefetch_range(&mut self, start_idx: usize) {
160 assert!(
161 start_idx >= self.block_start_idx_inclusive
162 && start_idx <= self.block_end_idx_inclusive
163 );
164
165 self.preload_end_block_idx = 0;
166 if let Some(bound) = self.options.must_iterated_end_user_key.as_ref() {
167 let block_metas = &self.sst.meta.block_metas
168 [self.block_start_idx_inclusive..=self.block_end_idx_inclusive];
169 let next_to_start_idx = start_idx + 1;
170 if next_to_start_idx <= self.block_end_idx_inclusive {
171 let end_idx = match bound {
172 Unbounded => self.block_end_idx_inclusive + 1,
173 Included(dest_key) => {
174 let dest_key = dest_key.as_ref();
175 self.block_start_idx_inclusive
176 + block_metas.partition_point(|block_meta| {
177 FullKey::decode(&block_meta.smallest_key).user_key <= dest_key
178 })
179 }
180 Excluded(end_key) => {
181 let end_key = end_key.as_ref();
182 self.block_start_idx_inclusive
183 + block_metas.partition_point(|block_meta| {
184 FullKey::decode(&block_meta.smallest_key).user_key < end_key
185 })
186 }
187 };
188
189 if next_to_start_idx < end_idx {
191 assert!(
192 end_idx <= self.block_end_idx_inclusive + 1,
193 "end_idx {} > block_end_idx_inclusive {} next_to_start_idx {}",
194 end_idx,
195 self.block_end_idx_inclusive,
196 next_to_start_idx
197 );
198 self.preload_end_block_idx = end_idx;
199 }
200 }
201 }
202 }
203
204 async fn seek_idx(
206 &mut self,
207 idx: usize,
208 seek_key: Option<FullKey<&[u8]>>,
209 ) -> HummockResult<()> {
210 tracing::debug!(
211 target: "events::storage::sstable::block_seek",
212 "table iterator seek: sstable_object_id = {}, block_id = {}",
213 self.sst.id,
214 idx,
215 );
216
217 tokio::task::consume_budget().await;
221
222 let mut hit_cache = false;
223 if idx > self.block_end_idx_inclusive {
224 self.block_iter = None;
225 return Ok(());
226 }
227 if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
229 match self
230 .sstable_store
231 .prefetch_blocks(
232 &self.sst,
233 idx,
234 self.preload_end_block_idx,
235 self.options.cache_policy,
236 &mut self.stats,
237 )
238 .instrument_await("prefetch_blocks".verbose())
239 .await
240 {
241 Ok(preload_stream) => self.preload_stream = Some(preload_stream),
242 Err(e) => {
243 tracing::warn!(error = %e.as_report(), "failed to create stream for prefetch data, fall back to block get")
244 }
245 }
246 }
247
248 if self
249 .preload_stream
250 .as_ref()
251 .map(|preload_stream| preload_stream.next_block_index() <= idx)
252 .unwrap_or(false)
253 {
254 while let Some(preload_stream) = self.preload_stream.as_mut() {
255 let mut ret = Ok(());
256 while preload_stream.next_block_index() < idx {
257 if let Err(e) = preload_stream.next_block().await {
258 ret = Err(e);
259 break;
260 }
261 }
262 assert_eq!(preload_stream.next_block_index(), idx);
263 if ret.is_ok() {
264 match preload_stream.next_block().await {
265 Ok(Some(block)) => {
266 hit_cache = true;
267 self.block_iter = Some(BlockIterator::new(block));
268 break;
269 }
270 Ok(None) => {
271 self.preload_stream.take();
272 }
273 Err(e) => {
274 self.preload_stream.take();
275 ret = Err(e);
276 }
277 }
278 } else {
279 self.preload_stream.take();
280 }
281 if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx {
282 if let Err(e) = ret {
283 tracing::warn!(error = %e.as_report(), "recreate stream because the connection to remote storage has closed");
284 if self.preload_retry_times >= self.options.max_preload_retry_times {
285 break;
286 }
287 self.preload_retry_times += 1;
288 }
289
290 match self
291 .sstable_store
292 .prefetch_blocks(
293 &self.sst,
294 idx,
295 self.preload_end_block_idx,
296 self.options.cache_policy,
297 &mut self.stats,
298 )
299 .instrument_await("prefetch_blocks".verbose())
300 .await
301 {
302 Ok(stream) => {
303 self.preload_stream = Some(stream);
304 }
305 Err(e) => {
306 tracing::warn!(error = %e.as_report(), "failed to recreate stream meet IO error");
307 break;
308 }
309 }
310 }
311 }
312 }
313 if !hit_cache {
314 let block = self
315 .sstable_store
316 .get(&self.sst, idx, self.options.cache_policy, &mut self.stats)
317 .await?;
318 self.block_iter = Some(BlockIterator::new(block));
319 };
320 let block_iter = self.block_iter.as_mut().unwrap();
321 if let Some(key) = seek_key {
322 block_iter.seek(key);
323 } else {
324 block_iter.seek_to_first();
325 }
326
327 self.cur_idx = idx;
328
329 Ok(())
330 }
331
332 fn calculate_block_idx_by_key(&self, key: FullKey<&[u8]>) -> usize {
333 self.block_start_idx_inclusive
334 + self.sst.meta.block_metas
335 [self.block_start_idx_inclusive..=self.block_end_idx_inclusive]
336 .partition_point(|block_meta| {
337 FullKey::decode(&block_meta.smallest_key).le(&key)
341 })
342 .saturating_sub(1) }
344}
345
346impl HummockIterator for SstableIterator {
347 type Direction = Forward;
348
349 async fn next(&mut self) -> HummockResult<()> {
350 self.stats.total_key_count += 1;
351 let block_iter = self.block_iter.as_mut().expect("no block iter");
352 if !block_iter.try_next() {
353 self.seek_idx(self.cur_idx + 1, None).await?;
355 }
356
357 Ok(())
358 }
359
360 fn key(&self) -> FullKey<&[u8]> {
361 self.block_iter.as_ref().expect("no block iter").key()
362 }
363
364 fn value(&self) -> HummockValue<&[u8]> {
365 let raw_value = self.block_iter.as_ref().expect("no block iter").value();
366
367 HummockValue::from_slice(raw_value).expect("decode error")
368 }
369
370 fn is_valid(&self) -> bool {
371 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
372 }
373
374 async fn rewind(&mut self) -> HummockResult<()> {
375 self.init_block_prefetch_range(self.block_start_idx_inclusive);
376 self.seek_idx(self.block_start_idx_inclusive, None).await?;
378 Ok(())
379 }
380
381 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
382 let block_idx = self.calculate_block_idx_by_key(key);
383 self.init_block_prefetch_range(block_idx);
384
385 self.seek_idx(block_idx, Some(key)).await?;
386 if !self.is_valid() {
387 self.seek_idx(block_idx + 1, None).await?;
389 }
390 Ok(())
391 }
392
393 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
394 stats.add(&self.stats);
395 }
396
397 fn value_meta(&self) -> ValueMeta {
398 ValueMeta {
399 object_id: Some(self.sst.id),
400 block_id: Some(self.cur_idx as _),
401 }
402 }
403}
404
405impl SstableIteratorType for SstableIterator {
406 fn create(
407 sstable: TableHolder,
408 sstable_store: SstableStoreRef,
409 options: Arc<SstableIteratorReadOptions>,
410 sstable_info_ref: &SstableInfo,
411 ) -> Self {
412 SstableIterator::new(sstable, sstable_store, options, sstable_info_ref)
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use std::collections::Bound;
419
420 use bytes::Bytes;
421 use foyer::Hint;
422 use itertools::Itertools;
423 use rand::prelude::*;
424 use rand::rng as thread_rng;
425 use risingwave_common::catalog::TableId;
426 use risingwave_common::hash::VirtualNode;
427 use risingwave_common::util::epoch::test_epoch;
428 use risingwave_hummock_sdk::EpochWithGap;
429 use risingwave_hummock_sdk::key::{TableKey, UserKey};
430 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
431
432 use super::*;
433 use crate::assert_bytes_eq;
434 use crate::hummock::CachePolicy;
435 use crate::hummock::iterator::test_utils::mock_sstable_store;
436 use crate::hummock::test_utils::{
437 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
438 gen_test_sstable_info, gen_test_sstable_with_table_ids, test_key_of, test_value_of,
439 };
440
441 async fn inner_test_forward_iterator(
442 sstable_store: SstableStoreRef,
443 handle: TableHolder,
444 sstable_info: SstableInfo,
445 ) {
446 let mut sstable_iter = SstableIterator::create(
449 handle,
450 sstable_store,
451 Arc::new(SstableIteratorReadOptions::default()),
452 &sstable_info,
453 );
454 let mut cnt = 0;
455 sstable_iter.rewind().await.unwrap();
456
457 while sstable_iter.is_valid() {
458 let key = sstable_iter.key();
459 let value = sstable_iter.value();
460 assert_eq!(key, test_key_of(cnt).to_ref());
461 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
462 cnt += 1;
463 sstable_iter.next().await.unwrap();
464 }
465
466 assert_eq!(cnt, TEST_KEYS_COUNT);
467 }
468
469 #[tokio::test]
470 async fn test_table_iterator() {
471 let sstable_store = mock_sstable_store().await;
473 let (sstable, sstable_info) =
474 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
475 .await;
476 assert!(sstable.meta.block_metas.len() > 10);
479
480 inner_test_forward_iterator(sstable_store.clone(), sstable, sstable_info).await;
481 }
482
483 #[tokio::test]
484 async fn test_table_seek() {
485 let sstable_store = mock_sstable_store().await;
486 let (sstable, sstable_info) =
487 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
488 .await;
489 assert!(sstable.meta.block_metas.len() > 10);
492 let mut sstable_iter = SstableIterator::create(
493 sstable,
494 sstable_store,
495 Arc::new(SstableIteratorReadOptions::default()),
496 &sstable_info,
497 );
498 let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
499 let mut rng = thread_rng();
500 all_key_to_test.shuffle(&mut rng);
501
502 for i in all_key_to_test {
504 sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
505 let key = sstable_iter.key();
507 assert_eq!(key, test_key_of(i).to_ref());
508 }
509
510 sstable_iter.seek(test_key_of(500).to_ref()).await.unwrap();
512 for i in 500..TEST_KEYS_COUNT {
513 let key = sstable_iter.key();
514 assert_eq!(key, test_key_of(i).to_ref());
515 sstable_iter.next().await.unwrap();
516 }
517 assert!(!sstable_iter.is_valid());
518
519 let smallest_key = FullKey::for_test(
521 TableId::default(),
522 [
523 VirtualNode::ZERO.to_be_bytes().as_slice(),
524 format!("key_aaaa_{:05}", 0).as_bytes(),
525 ]
526 .concat(),
527 test_epoch(233),
528 );
529 sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
530 let key = sstable_iter.key();
531 assert_eq!(key, test_key_of(0).to_ref());
532
533 let largest_key = FullKey::for_test(
535 TableId::default(),
536 [
537 VirtualNode::ZERO.to_be_bytes().as_slice(),
538 format!("key_zzzz_{:05}", 0).as_bytes(),
539 ]
540 .concat(),
541 test_epoch(233),
542 );
543 sstable_iter.seek(largest_key.to_ref()).await.unwrap();
544 assert!(!sstable_iter.is_valid());
545
546 for idx in 1..TEST_KEYS_COUNT {
548 sstable_iter
553 .seek(
554 FullKey::for_test(
555 TableId::default(),
556 [
557 VirtualNode::ZERO.to_be_bytes().as_slice(),
558 format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
559 ]
560 .concat(),
561 0,
562 )
563 .to_ref(),
564 )
565 .await
566 .unwrap();
567
568 let key = sstable_iter.key();
569 assert_eq!(key, test_key_of(idx).to_ref());
570 sstable_iter.next().await.unwrap();
571 }
572 assert!(!sstable_iter.is_valid());
573 }
574
575 #[tokio::test]
576 async fn test_prefetch_table_read() {
577 let sstable_store = mock_sstable_store().await;
578 let kv_iter =
580 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i))));
581 let sst_info = gen_test_sstable_info(
582 default_builder_opt_for_test(),
583 0,
584 kv_iter,
585 sstable_store.clone(),
586 )
587 .await;
588
589 let end_key = test_key_of(TEST_KEYS_COUNT);
590 let uk = UserKey::new(
591 end_key.user_key.table_id,
592 TableKey(Bytes::from(end_key.user_key.table_key.0)),
593 );
594 let options = Arc::new(SstableIteratorReadOptions {
595 cache_policy: CachePolicy::Fill(Hint::Normal),
596 must_iterated_end_user_key: Some(Bound::Included(uk.clone())),
597 max_preload_retry_times: 0,
598 prefetch_for_large_query: false,
599 });
600 let mut stats = StoreLocalStatistic::default();
601 let mut sstable_iter = SstableIterator::create(
602 sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
603 sstable_store.clone(),
604 options.clone(),
605 &sst_info,
606 );
607 let mut cnt = 1000;
608 sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
609 while sstable_iter.is_valid() {
610 let key = sstable_iter.key();
611 let value = sstable_iter.value();
612 assert_eq!(
613 key,
614 test_key_of(cnt).to_ref(),
615 "fail at {}, get key :{:?}",
616 cnt,
617 String::from_utf8(key.user_key.table_key.key_part().to_vec()).unwrap()
618 );
619 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
620 cnt += 1;
621 sstable_iter.next().await.unwrap();
622 }
623 assert_eq!(cnt, TEST_KEYS_COUNT);
624 let mut sstable_iter = SstableIterator::create(
625 sstable_store.sstable(&sst_info, &mut stats).await.unwrap(),
626 sstable_store,
627 options.clone(),
628 &sst_info,
629 );
630 let mut cnt = 1000;
631 sstable_iter.seek(test_key_of(cnt).to_ref()).await.unwrap();
632 while sstable_iter.is_valid() {
633 let key = sstable_iter.key();
634 let value = sstable_iter.value();
635 assert_eq!(key, test_key_of(cnt).to_ref());
636 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
637 cnt += 1;
638 sstable_iter.next().await.unwrap();
639 }
640 assert_eq!(cnt, TEST_KEYS_COUNT);
641 }
642
643 #[tokio::test]
644 async fn test_read_table_id_range() {
645 {
646 let sstable_store = mock_sstable_store().await;
647 let (sstable, sstable_info) =
648 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
649 .await;
650 let mut sstable_iter = SstableIterator::create(
651 sstable,
652 sstable_store.clone(),
653 Arc::new(SstableIteratorReadOptions::default()),
654 &sstable_info,
655 );
656 sstable_iter.rewind().await.unwrap();
657 assert!(sstable_iter.is_valid());
658 assert_eq!(sstable_iter.key(), test_key_of(0).to_ref());
659 }
660
661 {
662 let sstable_store = mock_sstable_store().await;
663 let k1 = {
665 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
666 table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
667 let uk = UserKey::for_test(TableId::from(1), table_key);
668 FullKey {
669 user_key: uk,
670 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
671 }
672 };
673
674 let k2 = {
675 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
676 table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
677 let uk = UserKey::for_test(TableId::from(2), table_key);
678 FullKey {
679 user_key: uk,
680 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
681 }
682 };
683
684 let k3 = {
685 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
686 table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
687 let uk = UserKey::for_test(TableId::from(3), table_key);
688 FullKey {
689 user_key: uk,
690 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
691 }
692 };
693
694 {
695 let kv_pairs = vec![
696 (k1.clone(), HummockValue::put(test_value_of(1))),
697 (k2.clone(), HummockValue::put(test_value_of(2))),
698 (k3.clone(), HummockValue::put(test_value_of(3))),
699 ];
700
701 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
702 default_builder_opt_for_test(),
703 10,
704 kv_pairs.into_iter(),
705 sstable_store.clone(),
706 vec![1, 2, 3],
707 )
708 .await;
709 let mut sstable_iter = SstableIterator::create(
710 sstable,
711 sstable_store.clone(),
712 Arc::new(SstableIteratorReadOptions::default()),
713 &SstableInfo::from(SstableInfoInner {
714 table_ids: vec![1.into(), 2.into(), 3.into()],
715 ..Default::default()
716 }),
717 );
718 sstable_iter.rewind().await.unwrap();
719 assert!(sstable_iter.is_valid());
720 assert!(sstable_iter.key().eq(&k1.to_ref()));
721
722 let mut cnt = 0;
723 let mut last_key = k1.clone();
724 while sstable_iter.is_valid() {
725 last_key = sstable_iter.key().to_vec();
726 cnt += 1;
727 sstable_iter.next().await.unwrap();
728 }
729
730 assert_eq!(3, cnt);
731 assert_eq!(last_key, k3.clone());
732 }
733
734 {
735 let kv_pairs = vec![
736 (k1.clone(), HummockValue::put(test_value_of(1))),
737 (k2.clone(), HummockValue::put(test_value_of(2))),
738 (k3.clone(), HummockValue::put(test_value_of(3))),
739 ];
740
741 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
742 default_builder_opt_for_test(),
743 10,
744 kv_pairs.into_iter(),
745 sstable_store.clone(),
746 vec![1, 2, 3],
747 )
748 .await;
749
750 let mut sstable_iter = SstableIterator::create(
751 sstable,
752 sstable_store.clone(),
753 Arc::new(SstableIteratorReadOptions::default()),
754 &SstableInfo::from(SstableInfoInner {
755 table_ids: vec![1.into(), 2.into()],
756 ..Default::default()
757 }),
758 );
759 sstable_iter.rewind().await.unwrap();
760 assert!(sstable_iter.is_valid());
761 assert!(sstable_iter.key().eq(&k1.to_ref()));
762
763 let mut cnt = 0;
764 let mut last_key = k1.clone();
765 while sstable_iter.is_valid() {
766 last_key = sstable_iter.key().to_vec();
767 cnt += 1;
768 sstable_iter.next().await.unwrap();
769 }
770
771 assert_eq!(2, cnt);
772 assert_eq!(last_key, k2.clone());
773 }
774
775 {
776 let kv_pairs = vec![
777 (k1.clone(), HummockValue::put(test_value_of(1))),
778 (k2.clone(), HummockValue::put(test_value_of(2))),
779 (k3.clone(), HummockValue::put(test_value_of(3))),
780 ];
781
782 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
783 default_builder_opt_for_test(),
784 10,
785 kv_pairs.into_iter(),
786 sstable_store.clone(),
787 vec![1, 2, 3],
788 )
789 .await;
790
791 let mut sstable_iter = SstableIterator::create(
792 sstable,
793 sstable_store.clone(),
794 Arc::new(SstableIteratorReadOptions::default()),
795 &SstableInfo::from(SstableInfoInner {
796 table_ids: vec![2.into(), 3.into()],
797 ..Default::default()
798 }),
799 );
800 sstable_iter.rewind().await.unwrap();
801 assert!(sstable_iter.is_valid());
802 assert!(sstable_iter.key().eq(&k2.to_ref()));
803
804 let mut cnt = 0;
805 let mut last_key = k1.clone();
806 while sstable_iter.is_valid() {
807 last_key = sstable_iter.key().to_vec();
808 cnt += 1;
809 sstable_iter.next().await.unwrap();
810 }
811
812 assert_eq!(2, cnt);
813 assert_eq!(last_key, k3.clone());
814 }
815
816 {
817 let kv_pairs = vec![
818 (k1.clone(), HummockValue::put(test_value_of(1))),
819 (k2.clone(), HummockValue::put(test_value_of(2))),
820 (k3.clone(), HummockValue::put(test_value_of(3))),
821 ];
822
823 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
824 default_builder_opt_for_test(),
825 10,
826 kv_pairs.into_iter(),
827 sstable_store.clone(),
828 vec![1, 2, 3],
829 )
830 .await;
831
832 let mut sstable_iter = SstableIterator::create(
833 sstable,
834 sstable_store.clone(),
835 Arc::new(SstableIteratorReadOptions::default()),
836 &SstableInfo::from(SstableInfoInner {
837 table_ids: vec![2.into()],
838 ..Default::default()
839 }),
840 );
841 sstable_iter.rewind().await.unwrap();
842 assert!(sstable_iter.is_valid());
843 assert!(sstable_iter.key().eq(&k2.to_ref()));
844
845 let mut cnt = 0;
846 let mut last_key = k1.clone();
847 while sstable_iter.is_valid() {
848 last_key = sstable_iter.key().to_vec();
849 cnt += 1;
850 sstable_iter.next().await.unwrap();
851 }
852
853 assert_eq!(1, cnt);
854 assert_eq!(last_key, k2.clone());
855 }
856 }
857 }
858}