1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::ops::Range;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use fail::fail_point;
24use risingwave_common::catalog::TableId;
25use risingwave_hummock_sdk::KeyComparator;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29
30use crate::hummock::block_stream::BlockDataStream;
31use crate::hummock::compactor::task_progress::TaskProgress;
32use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
33use crate::hummock::sstable_store::SstableStoreRef;
34use crate::hummock::value::HummockValue;
35use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult, TableHolder};
36use crate::monitor::StoreLocalStatistic;
37
38const PROGRESS_KEY_INTERVAL: usize = 100;
39
40pub struct SstableStreamIterator {
44 sstable_store: SstableStoreRef,
45 sstable: TableHolder,
46 block_metas_range: Range<usize>,
48 block_stream: Option<BlockDataStream>,
50
51 block_iter: Option<BlockIterator>,
53
54 block_idx: usize,
56
57 stats_ptr: Arc<AtomicU64>,
59
60 sstable_info: SstableInfo,
62
63 read_table_ids: HashSet<TableId>,
65 task_progress: Arc<TaskProgress>,
66 io_retry_times: usize,
67 max_io_retry_times: usize,
68
69 key_range_left: FullKey<Vec<u8>>,
71 key_range_right: FullKey<Vec<u8>>,
72 key_range_right_exclusive: bool,
73}
74
75impl SstableStreamIterator {
76 pub fn new(
91 sstable: TableHolder,
92 block_metas_range: Range<usize>,
93 sstable_info: SstableInfo,
94 stats: &StoreLocalStatistic,
95 task_progress: Arc<TaskProgress>,
96 sstable_store: SstableStoreRef,
97 max_io_retry_times: usize,
98 ) -> Self {
99 let read_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().copied());
100 let block_metas_range = {
103 let block_metas = &sstable.meta.block_metas[block_metas_range.clone()];
104 let inner_range =
105 filter_block_metas(block_metas, &read_table_ids, sstable_info.key_range.clone());
106 (block_metas_range.start + inner_range.start)
108 ..(block_metas_range.start + inner_range.end)
109 };
110
111 let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
112 let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
113 let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
114
115 Self {
116 block_stream: None,
117 block_iter: None,
118 sstable,
119 block_metas_range,
120 block_idx: 0,
121 stats_ptr: stats.remote_io_time.clone(),
122 read_table_ids,
123 sstable_info,
124 sstable_store,
125 task_progress,
126 io_retry_times: 0,
127 max_io_retry_times,
128 key_range_left,
129 key_range_right,
130 key_range_right_exclusive,
131 }
132 }
133
134 #[inline]
136 fn block_metas(&self) -> &[BlockMeta] {
137 &self.sstable.meta.block_metas[self.block_metas_range.clone()]
138 }
139
140 #[inline]
142 fn block_count(&self) -> usize {
143 self.block_metas_range.len()
144 }
145
146 async fn create_stream(&mut self) -> HummockResult<()> {
147 let block_stream = self
148 .sstable_store
149 .get_stream_for_blocks(
150 self.sstable_info.object_id,
151 &self.block_metas()[self.block_idx..],
152 )
153 .instrument_await("stream_iter_get_stream".verbose())
154 .await?;
155 self.block_stream = Some(block_stream);
156 Ok(())
157 }
158
159 async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
160 while let Some(block_iter) = self.block_iter.as_mut() {
161 if self.read_table_ids.contains(&block_iter.table_id()) {
162 return Ok(());
163 } else {
164 self.next_block().await?;
165 }
166 }
167 Ok(())
168 }
169
170 pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
175 self.next_block().await?;
177
178 let seek_key = if let Some(seek_key) = seek_key {
183 if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
184 Some(self.key_range_left.to_ref())
185 } else {
186 Some(seek_key)
187 }
188 } else {
189 Some(self.key_range_left.to_ref())
190 };
191
192 if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
193 block_iter.seek(seek_key);
194
195 if !block_iter.is_valid() {
196 self.next_block().await?;
198 }
199 }
200
201 self.prune_from_valid_block_iter().await?;
202 Ok(())
203 }
204
205 async fn next_block(&mut self) -> HummockResult<()> {
210 let now = Instant::now();
212 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
213 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
214 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
215 });
216 if self.block_idx < self.block_count() {
217 loop {
218 let ret = match &mut self.block_stream {
219 Some(block_stream) => block_stream.next_block().await,
220 None => {
221 self.create_stream().await?;
222 continue;
223 }
224 };
225 match ret {
226 Ok(Some(block)) => {
227 let mut block_iter =
228 BlockIterator::new(BlockHolder::from_owned_block(block));
229 block_iter.seek_to_first();
230 self.block_idx += 1;
231 self.block_iter = Some(block_iter);
232 return Ok(());
233 }
234 Ok(None) => break,
235 Err(e) => {
236 if !e.is_object_error() || !self.need_recreate_io_stream() {
237 return Err(e);
238 }
239 self.block_stream.take();
240 self.io_retry_times += 1;
241 fail_point!("create_stream_err");
242
243 tracing::warn!(
244 "retry create stream for sstable {} times, sstinfo={}",
245 self.io_retry_times,
246 self.sst_debug_info()
247 );
248 }
249 }
250 }
251 }
252 self.block_idx = self.block_count();
253 self.block_iter = None;
254
255 Ok(())
256 }
257
258 pub async fn next(&mut self) -> HummockResult<()> {
265 if !self.is_valid() {
266 return Ok(());
267 }
268
269 let block_iter = self.block_iter.as_mut().expect("no block iter");
270 block_iter.next();
271 if !block_iter.is_valid() {
272 self.next_block().await?;
273 self.prune_from_valid_block_iter().await?;
274 }
275
276 if !self.is_valid() {
277 return Ok(());
278 }
279
280 let key = self
282 .block_iter
283 .as_ref()
284 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
285 .key();
286
287 if self.exceed_key_range_right(key) {
288 self.block_iter = None;
289 }
290
291 Ok(())
292 }
293
294 pub fn key(&self) -> FullKey<&[u8]> {
295 let key = self
296 .block_iter
297 .as_ref()
298 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
299 .key();
300
301 assert!(
302 !self.exceed_key_range_left(key),
303 "key {:?} key_range_left {:?}",
304 key,
305 self.key_range_left.to_ref()
306 );
307
308 assert!(
309 !self.exceed_key_range_right(key),
310 "key {:?} key_range_right {:?} key_range_right_exclusive {}",
311 key,
312 self.key_range_right.to_ref(),
313 self.key_range_right_exclusive
314 );
315
316 key
317 }
318
319 pub fn value(&self) -> HummockValue<&[u8]> {
320 let raw_value = self
321 .block_iter
322 .as_ref()
323 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
324 .value();
325 HummockValue::from_slice(raw_value)
326 .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
327 }
328
329 pub fn is_valid(&self) -> bool {
330 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
332 }
333
334 fn sst_debug_info(&self) -> String {
335 format!(
336 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
337 self.sstable_info.object_id,
338 self.sstable_info.sst_id,
339 self.sstable_info.meta_offset,
340 self.sstable_info.table_ids
341 )
342 }
343
344 fn need_recreate_io_stream(&self) -> bool {
345 self.io_retry_times < self.max_io_retry_times
346 }
347
348 fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
349 key.cmp(&self.key_range_left.to_ref()).is_lt()
350 }
351
352 fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
353 if self.key_range_right_exclusive {
354 key.cmp(&self.key_range_right.to_ref()).is_ge()
355 } else {
356 key.cmp(&self.key_range_right.to_ref()).is_gt()
357 }
358 }
359}
360
361impl Drop for SstableStreamIterator {
362 fn drop(&mut self) {
363 self.task_progress.dec_num_pending_read_io()
364 }
365}
366
367pub struct ConcatSstableIterator {
370 key_range: KeyRange,
373
374 sstable_iter: Option<SstableStreamIterator>,
376
377 cur_idx: usize,
379
380 sstables: Vec<SstableInfo>,
382
383 sstable_store: SstableStoreRef,
384
385 stats: StoreLocalStatistic,
386 task_progress: Arc<TaskProgress>,
387 max_io_retry_times: usize,
388}
389
390impl ConcatSstableIterator {
391 pub fn new(
395 sst_infos: Vec<SstableInfo>,
396 key_range: KeyRange,
397 sstable_store: SstableStoreRef,
398 task_progress: Arc<TaskProgress>,
399 max_io_retry_times: usize,
400 ) -> Self {
401 Self {
402 key_range,
403 sstable_iter: None,
404 cur_idx: 0,
405 sstables: sst_infos,
406 sstable_store,
407 task_progress,
408 stats: StoreLocalStatistic::default(),
409 max_io_retry_times,
410 }
411 }
412
413 #[cfg(test)]
414 pub fn for_test(
415 sst_infos: Vec<SstableInfo>,
416 key_range: KeyRange,
417 sstable_store: SstableStoreRef,
418 ) -> Self {
419 Self::new(
420 sst_infos,
421 key_range,
422 sstable_store,
423 Arc::new(TaskProgress::default()),
424 0,
425 )
426 }
427
428 async fn seek_idx(
430 &mut self,
431 idx: usize,
432 seek_key: Option<FullKey<&[u8]>>,
433 ) -> HummockResult<()> {
434 self.sstable_iter.take();
435 let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
436 {
437 (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
438 Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
439 Ordering::Greater => Some(seek_key),
440 },
441 (Some(seek_key), true) => Some(seek_key),
442 (None, true) => None,
443 (None, false) => Some(FullKey::decode(&self.key_range.left)),
444 };
445
446 self.cur_idx = idx;
447 while self.cur_idx < self.sstables.len() {
448 let table_info = &self.sstables[self.cur_idx];
449 let read_table_ids = HashSet::from_iter(table_info.table_ids.iter().copied());
450 if read_table_ids.is_empty() {
451 self.cur_idx += 1;
452 seek_key = None;
453 continue;
454 }
455 let sstable = self
456 .sstable_store
457 .sstable(table_info, &mut self.stats)
458 .instrument_await("stream_iter_sstable".verbose())
459 .await?;
460
461 let filter_key_range = match seek_key {
462 Some(seek_key) => {
463 KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
464 }
465 None => self.key_range.clone(),
466 };
467
468 let block_metas_range =
469 filter_block_metas(&sstable.meta.block_metas, &read_table_ids, filter_key_range);
470
471 let mut found = true;
472 if block_metas_range.is_empty() {
473 found = false;
474 } else {
475 self.task_progress.inc_num_pending_read_io();
476 let mut sstable_iter = SstableStreamIterator::new(
477 sstable,
478 block_metas_range,
479 table_info.clone(),
480 &self.stats,
481 self.task_progress.clone(),
482 self.sstable_store.clone(),
483 self.max_io_retry_times,
484 );
485 sstable_iter.seek(seek_key).await?;
486
487 if sstable_iter.is_valid() {
488 self.sstable_iter = Some(sstable_iter);
489 } else {
490 found = false;
491 }
492 }
493
494 if found {
495 return Ok(());
496 } else {
497 self.cur_idx += 1;
498 seek_key = None;
499 }
500 }
501 Ok(())
502 }
503}
504
505impl HummockIterator for ConcatSstableIterator {
506 type Direction = Forward;
507
508 async fn next(&mut self) -> HummockResult<()> {
509 let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
510
511 sstable_iter.next().await?;
513 if sstable_iter.is_valid() {
514 Ok(())
515 } else {
516 self.seek_idx(self.cur_idx + 1, None).await?;
518 Ok(())
519 }
520 }
521
522 fn key(&self) -> FullKey<&[u8]> {
523 self.sstable_iter.as_ref().expect("no table iter").key()
524 }
525
526 fn value(&self) -> HummockValue<&[u8]> {
527 self.sstable_iter.as_ref().expect("no table iter").value()
528 }
529
530 fn is_valid(&self) -> bool {
531 self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
532 }
533
534 async fn rewind(&mut self) -> HummockResult<()> {
535 self.seek_idx(0, None).await
536 }
537
538 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
540 let seek_key = if self.key_range.left.is_empty() {
541 key
542 } else {
543 match key.cmp(&FullKey::decode(&self.key_range.left)) {
544 Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
545 Ordering::Greater => key,
546 }
547 };
548 let table_idx = self.sstables.partition_point(|table| {
549 let max_sst_key = &table.key_range.right;
556 FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
557 });
558
559 self.seek_idx(table_idx, Some(key)).await
560 }
561
562 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
563 stats.add(&self.stats)
564 }
565
566 fn value_meta(&self) -> ValueMeta {
567 let iter = self.sstable_iter.as_ref().expect("no table iter");
568 assert!(iter.block_idx >= 1);
571 let absolute_block_idx = iter.block_metas_range.start + iter.block_idx - 1;
573 ValueMeta {
574 object_id: Some(iter.sstable_info.object_id),
575 block_id: Some(absolute_block_idx as u64),
576 }
577 }
578}
579
580pub struct MonitoredCompactorIterator<I> {
581 inner: I,
582 task_progress: Arc<TaskProgress>,
583
584 processed_key_num: usize,
585}
586
587impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
588 pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
589 Self {
590 inner,
591 task_progress,
592 processed_key_num: 0,
593 }
594 }
595}
596
597impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
598 type Direction = Forward;
599
600 async fn next(&mut self) -> HummockResult<()> {
601 self.inner.next().await?;
602 self.processed_key_num += 1;
603
604 if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
605 self.task_progress
606 .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
607 }
608
609 Ok(())
610 }
611
612 fn key(&self) -> FullKey<&[u8]> {
613 self.inner.key()
614 }
615
616 fn value(&self) -> HummockValue<&[u8]> {
617 self.inner.value()
618 }
619
620 fn is_valid(&self) -> bool {
621 self.inner.is_valid()
622 }
623
624 async fn rewind(&mut self) -> HummockResult<()> {
625 self.processed_key_num = 0;
626 self.inner.rewind().await?;
627 Ok(())
628 }
629
630 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
631 self.processed_key_num = 0;
632 self.inner.seek(key).await?;
633 Ok(())
634 }
635
636 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
637 self.inner.collect_local_statistic(stats)
638 }
639
640 fn value_meta(&self) -> ValueMeta {
641 self.inner.value_meta()
642 }
643}
644
645pub(crate) fn filter_block_metas(
646 block_metas: &[BlockMeta],
647 read_table_ids: &HashSet<TableId>,
648 key_range: KeyRange,
649) -> Range<usize> {
650 if block_metas.is_empty() {
651 return 0..0;
652 }
653
654 let mut start_index = if key_range.left.is_empty() {
655 0
656 } else {
657 block_metas
659 .partition_point(|block| {
660 KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
661 != Ordering::Less
662 })
663 .saturating_sub(1)
664 };
665
666 let mut end_index = if key_range.right.is_empty() {
667 block_metas.len()
668 } else {
669 let ret = block_metas.partition_point(|block| {
670 KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
671 != Ordering::Greater
672 });
673
674 if ret == 0 {
675 return 0..0;
677 }
678
679 ret
680 }
681 .saturating_sub(1);
682
683 while start_index <= end_index {
685 let start_block_table_id = block_metas[start_index].table_id();
686 if read_table_ids.contains(&start_block_table_id) {
687 break;
688 }
689
690 let old_start_index = start_index;
692 let block_metas_to_search = &block_metas[start_index..=end_index];
693
694 start_index += block_metas_to_search
695 .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
696
697 if old_start_index == start_index {
698 break;
700 }
701 }
702
703 while start_index <= end_index {
704 let end_block_table_id = block_metas[end_index].table_id();
705 if read_table_ids.contains(&end_block_table_id) {
706 break;
707 }
708
709 let old_end_index = end_index;
710 let block_metas_to_search = &block_metas[start_index..=end_index];
711
712 end_index = start_index
713 + block_metas_to_search
714 .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
715 .saturating_sub(1);
716
717 if end_index == old_end_index {
718 break;
720 }
721 }
722
723 if start_index > end_index {
724 return 0..0;
725 }
726
727 start_index..(end_index + 1)
728}
729
730#[cfg(test)]
731mod tests {
732 use std::cmp::Ordering;
733 use std::collections::HashSet;
734
735 use risingwave_common::catalog::TableId;
736 use risingwave_common::util::epoch::test_epoch;
737 use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
738 use risingwave_hummock_sdk::key_range::KeyRange;
739 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
740
741 use crate::hummock::BlockMeta;
742 use crate::hummock::compactor::ConcatSstableIterator;
743 use crate::hummock::iterator::test_utils::mock_sstable_store;
744 use crate::hummock::iterator::{HummockIterator, MergeIterator};
745 use crate::hummock::test_utils::{
746 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info,
747 gen_test_sstable_with_table_ids, test_key_of, test_value_of,
748 };
749 use crate::hummock::value::HummockValue;
750
751 #[tokio::test]
752 async fn test_concat_iterator() {
753 let sstable_store = mock_sstable_store().await;
754 let mut table_infos = vec![];
755 for object_id in 0..3 {
756 let start_index = object_id * TEST_KEYS_COUNT;
757 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
758 let table_info = gen_test_sstable_info(
759 default_builder_opt_for_test(),
760 object_id as u64,
761 (start_index..end_index)
762 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
763 sstable_store.clone(),
764 )
765 .await;
766 table_infos.push(table_info);
767 }
768 let start_index = 5000;
769 let end_index = 25000;
770
771 let kr = KeyRange::new(
772 test_key_of(start_index).encode().into(),
773 test_key_of(end_index).encode().into(),
774 );
775 let mut iter =
776 ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
777 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
778
779 for idx in start_index..end_index {
780 let key = iter.key();
781 let val = iter.value();
782 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
783 assert_eq!(
784 val.into_user_value().unwrap(),
785 test_value_of(idx).as_slice()
786 );
787 iter.next().await.unwrap();
788 }
789
790 let kr = KeyRange::new(
792 test_key_of(30000).encode().into(),
793 test_key_of(40000).encode().into(),
794 );
795 let mut iter =
796 ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
797 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
798 assert!(!iter.is_valid());
799 let kr = KeyRange::new(
800 test_key_of(start_index).encode().into(),
801 test_key_of(40000).encode().into(),
802 );
803 let mut iter =
804 ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
805 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
806 for idx in start_index..30000 {
807 let key = iter.key();
808 let val = iter.value();
809 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
810 assert_eq!(
811 val.into_user_value().unwrap(),
812 test_value_of(idx).as_slice()
813 );
814 iter.next().await.unwrap();
815 }
816 assert!(!iter.is_valid());
817
818 let kr = KeyRange::new(
820 test_key_of(0).encode().into(),
821 test_key_of(40000).encode().into(),
822 );
823 let mut iter =
824 ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
825 iter.seek(test_key_of(10000).to_ref()).await.unwrap();
826 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
827 iter.seek(test_key_of(10001).to_ref()).await.unwrap();
828 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
829 iter.seek(test_key_of(9999).to_ref()).await.unwrap();
830 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
831 iter.seek(test_key_of(1).to_ref()).await.unwrap();
832 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
833 iter.seek(test_key_of(29999).to_ref()).await.unwrap();
834 assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
835 iter.seek(test_key_of(30000).to_ref()).await.unwrap();
836 assert!(!iter.is_valid());
837
838 let kr = KeyRange::new(
840 test_key_of(6000).encode().into(),
841 test_key_of(16000).encode().into(),
842 );
843 let mut iter =
844 ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
845 iter.seek(test_key_of(17000).to_ref()).await.unwrap();
846 assert!(!iter.is_valid());
847 iter.seek(test_key_of(1).to_ref()).await.unwrap();
848 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
849 }
850
851 #[tokio::test]
852 async fn test_concat_iterator_seek_idx() {
853 let sstable_store = mock_sstable_store().await;
854 let mut table_infos = vec![];
855 for object_id in 0..3 {
856 let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
857 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
858 let table_info = gen_test_sstable_info(
859 default_builder_opt_for_test(),
860 object_id as u64,
861 (start_index..end_index)
862 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
863 sstable_store.clone(),
864 )
865 .await;
866 table_infos.push(table_info);
867 }
868
869 let kr = KeyRange::new(
871 test_key_of(0).encode().into(),
872 test_key_of(40000).encode().into(),
873 );
874 let mut iter =
875 ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
876 let sst = sstable_store
877 .sstable(&iter.sstables[0], &mut iter.stats)
878 .await
879 .unwrap();
880 let block_metas = &sst.meta.block_metas;
881 let block_1_smallest_key = block_metas[1].smallest_key.clone();
882 let block_2_smallest_key = block_metas[2].smallest_key.clone();
883 let seek_key = block_1_smallest_key.clone();
885 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
886 .await
887 .unwrap();
888 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
889 let seek_key = prev_full_key(block_1_smallest_key.as_slice());
892 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
893 .await
894 .unwrap();
895 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
896 iter.next().await.unwrap();
897 let block_1_second_key = iter.key().to_vec();
898 let seek_key = test_key_of(30001);
900 iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
901 .await
902 .unwrap();
903 assert!(!iter.is_valid());
904
905 let kr = KeyRange::new(
907 next_full_key(&block_1_smallest_key).into(),
908 prev_full_key(&block_2_smallest_key).into(),
909 );
910 let mut iter =
911 ConcatSstableIterator::for_test(table_infos.clone(), kr.clone(), sstable_store.clone());
912 let seek_key = FullKey::decode(&block_2_smallest_key);
914 assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
915 iter.seek_idx(0, Some(seek_key)).await.unwrap();
916 assert!(!iter.is_valid());
917 let seek_key = test_key_of(0).encode();
919 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
920 .await
921 .unwrap();
922 assert!(iter.is_valid());
923 assert_eq!(iter.key(), block_1_second_key.to_ref());
924
925 iter.seek_idx(0, None).await.unwrap();
927 assert!(iter.is_valid());
928 assert_eq!(iter.key(), block_1_second_key.to_ref());
929 }
930
931 #[tokio::test]
932 async fn test_filter_block_metas() {
933 use crate::hummock::compactor::iterator::filter_block_metas;
934
935 {
936 let block_metas = Vec::default();
937
938 let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
939
940 assert!(ret.is_empty());
941 }
942
943 {
944 let block_metas = vec![
945 BlockMeta {
946 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
947 ..Default::default()
948 },
949 BlockMeta {
950 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
951 ..Default::default()
952 },
953 BlockMeta {
954 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
955 ..Default::default()
956 },
957 ];
958
959 let ret = filter_block_metas(
960 &block_metas,
961 &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
962 KeyRange::default(),
963 );
964 let ret = &block_metas[ret];
965
966 assert_eq!(3, ret.len());
967 assert_eq!(
968 1,
969 FullKey::decode(&ret[0].smallest_key)
970 .user_key
971 .table_id
972 .as_raw_id()
973 );
974 assert_eq!(
975 3,
976 FullKey::decode(&ret[2].smallest_key)
977 .user_key
978 .table_id
979 .as_raw_id()
980 );
981 }
982
983 {
984 let block_metas = vec![
985 BlockMeta {
986 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
987 ..Default::default()
988 },
989 BlockMeta {
990 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
991 ..Default::default()
992 },
993 BlockMeta {
994 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
995 ..Default::default()
996 },
997 ];
998
999 let ret = filter_block_metas(
1000 &block_metas,
1001 &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1002 KeyRange::default(),
1003 );
1004 let ret = &block_metas[ret];
1005
1006 assert_eq!(2, ret.len());
1007 assert_eq!(
1008 2,
1009 FullKey::decode(&ret[0].smallest_key)
1010 .user_key
1011 .table_id
1012 .as_raw_id()
1013 );
1014 assert_eq!(
1015 3,
1016 FullKey::decode(&ret[1].smallest_key)
1017 .user_key
1018 .table_id
1019 .as_raw_id()
1020 );
1021 }
1022
1023 {
1024 let block_metas = vec![
1025 BlockMeta {
1026 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1027 ..Default::default()
1028 },
1029 BlockMeta {
1030 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1031 ..Default::default()
1032 },
1033 BlockMeta {
1034 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1035 ..Default::default()
1036 },
1037 ];
1038
1039 let ret = filter_block_metas(
1040 &block_metas,
1041 &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1042 KeyRange::default(),
1043 );
1044 let ret = &block_metas[ret];
1045
1046 assert_eq!(2, ret.len());
1047 assert_eq!(
1048 1,
1049 FullKey::decode(&ret[0].smallest_key)
1050 .user_key
1051 .table_id
1052 .as_raw_id()
1053 );
1054 assert_eq!(
1055 2,
1056 FullKey::decode(&ret[1].smallest_key)
1057 .user_key
1058 .table_id
1059 .as_raw_id()
1060 );
1061 }
1062
1063 {
1064 let block_metas = vec![
1065 BlockMeta {
1066 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1067 ..Default::default()
1068 },
1069 BlockMeta {
1070 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1071 ..Default::default()
1072 },
1073 BlockMeta {
1074 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1075 ..Default::default()
1076 },
1077 ];
1078 let ret = filter_block_metas(
1079 &block_metas,
1080 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1081 KeyRange::default(),
1082 );
1083 let ret = &block_metas[ret];
1084
1085 assert_eq!(1, ret.len());
1086 assert_eq!(
1087 2,
1088 FullKey::decode(&ret[0].smallest_key)
1089 .user_key
1090 .table_id
1091 .as_raw_id()
1092 );
1093 }
1094
1095 {
1096 let block_metas = vec![
1097 BlockMeta {
1098 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1099 ..Default::default()
1100 },
1101 BlockMeta {
1102 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1103 ..Default::default()
1104 },
1105 BlockMeta {
1106 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1107 ..Default::default()
1108 },
1109 BlockMeta {
1110 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1111 ..Default::default()
1112 },
1113 BlockMeta {
1114 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1115 ..Default::default()
1116 },
1117 ];
1118 let ret = filter_block_metas(
1119 &block_metas,
1120 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1121 KeyRange::default(),
1122 );
1123 let ret = &block_metas[ret];
1124
1125 assert_eq!(1, ret.len());
1126 assert_eq!(
1127 2,
1128 FullKey::decode(&ret[0].smallest_key)
1129 .user_key
1130 .table_id
1131 .as_raw_id()
1132 );
1133 }
1134
1135 {
1136 let block_metas = vec![
1137 BlockMeta {
1138 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1139 ..Default::default()
1140 },
1141 BlockMeta {
1142 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1143 ..Default::default()
1144 },
1145 BlockMeta {
1146 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1147 ..Default::default()
1148 },
1149 BlockMeta {
1150 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1151 ..Default::default()
1152 },
1153 BlockMeta {
1154 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1155 ..Default::default()
1156 },
1157 ];
1158
1159 let ret = filter_block_metas(
1160 &block_metas,
1161 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1162 KeyRange::default(),
1163 );
1164 let ret = &block_metas[ret];
1165
1166 assert_eq!(1, ret.len());
1167 assert_eq!(
1168 2,
1169 FullKey::decode(&ret[0].smallest_key)
1170 .user_key
1171 .table_id
1172 .as_raw_id()
1173 );
1174 }
1175
1176 {
1177 let block_metas = vec![
1178 BlockMeta {
1179 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1180 ..Default::default()
1181 },
1182 BlockMeta {
1183 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1184 ..Default::default()
1185 },
1186 BlockMeta {
1187 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1188 ..Default::default()
1189 },
1190 ];
1191
1192 let ret = filter_block_metas(
1193 &block_metas,
1194 &HashSet::from_iter(vec![1_u32.into(), 3_u32.into()].into_iter()),
1195 KeyRange::default(),
1196 );
1197 let ret = &block_metas[ret];
1198
1199 assert_eq!(3, ret.len());
1200 assert_eq!(
1201 1,
1202 FullKey::decode(&ret[0].smallest_key)
1203 .user_key
1204 .table_id
1205 .as_raw_id()
1206 );
1207 assert_eq!(
1208 2,
1209 FullKey::decode(&ret[1].smallest_key)
1210 .user_key
1211 .table_id
1212 .as_raw_id()
1213 );
1214 assert_eq!(
1215 3,
1216 FullKey::decode(&ret[2].smallest_key)
1217 .user_key
1218 .table_id
1219 .as_raw_id()
1220 );
1221 }
1222 }
1223
1224 #[tokio::test]
1225 async fn test_iterator_same_obj() {
1226 let sstable_store = mock_sstable_store().await;
1227
1228 let table_info = gen_test_sstable_info(
1229 default_builder_opt_for_test(),
1230 1_u64,
1231 (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1232 sstable_store.clone(),
1233 )
1234 .await;
1235
1236 let split_key = test_key_of(5000).encode();
1237 let sst_1: SstableInfo = SstableInfoInner {
1238 key_range: KeyRange {
1239 left: table_info.key_range.left.clone(),
1240 right: split_key.clone().into(),
1241 right_exclusive: true,
1242 },
1243 ..table_info.get_inner()
1244 }
1245 .into();
1246
1247 let total_key_count = sst_1.total_key_count;
1248 let sst_2: SstableInfo = SstableInfoInner {
1249 sst_id: sst_1.sst_id + 1,
1250 key_range: KeyRange {
1251 left: split_key.clone().into(),
1252 right: table_info.key_range.right.clone(),
1253 right_exclusive: table_info.key_range.right_exclusive,
1254 },
1255 ..table_info.get_inner()
1256 }
1257 .into();
1258
1259 {
1260 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1262
1263 let mut iter = ConcatSstableIterator::for_test(
1264 vec![sst_1.clone(), sst_2.clone()],
1265 KeyRange::default(),
1266 sstable_store.clone(),
1267 );
1268
1269 iter.rewind().await.unwrap();
1270
1271 let mut key_count = 0;
1272 while iter.is_valid() {
1273 let is_new_user_key = full_key_tracker.observe(iter.key());
1274 assert!(is_new_user_key);
1275 key_count += 1;
1276 iter.next().await.unwrap();
1277 }
1278
1279 assert_eq!(total_key_count, key_count);
1280 }
1281
1282 {
1283 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1284 let concat_1 = ConcatSstableIterator::for_test(
1285 vec![sst_1.clone()],
1286 KeyRange::default(),
1287 sstable_store.clone(),
1288 );
1289
1290 let concat_2 = ConcatSstableIterator::for_test(
1291 vec![sst_2.clone()],
1292 KeyRange::default(),
1293 sstable_store.clone(),
1294 );
1295
1296 let mut key_count = 0;
1297 let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1298 iter.rewind().await.unwrap();
1299 while iter.is_valid() {
1300 full_key_tracker.observe(iter.key());
1301 key_count += 1;
1302 iter.next().await.unwrap();
1303 }
1304 assert_eq!(total_key_count, key_count);
1305 }
1306 }
1307
1308 #[tokio::test]
1309 async fn test_concat_iterator_skips_hole_table_blocks() {
1310 let sstable_store = mock_sstable_store().await;
1311
1312 let key_1 = FullKey::for_test(TableId::new(1), b"a".to_vec(), test_epoch(1));
1313 let key_2 = FullKey::for_test(TableId::new(2), b"b".to_vec(), test_epoch(1));
1314 let key_3 = FullKey::for_test(TableId::new(3), b"c".to_vec(), test_epoch(1));
1315 let kv_pairs = vec![
1316 (key_1.clone(), HummockValue::put(b"value-1".to_vec())),
1317 (key_2.clone(), HummockValue::put(b"value-2".to_vec())),
1318 (key_3.clone(), HummockValue::put(b"value-3".to_vec())),
1319 ];
1320
1321 let (_sstable, table_info) = gen_test_sstable_with_table_ids(
1322 default_builder_opt_for_test(),
1323 10,
1324 kv_pairs.into_iter(),
1325 sstable_store.clone(),
1326 vec![1, 2, 3],
1327 )
1328 .await;
1329
1330 let table_info: SstableInfo = SstableInfoInner {
1331 table_ids: vec![1.into(), 3.into()],
1332 ..table_info.get_inner()
1333 }
1334 .into();
1335
1336 let mut iter = ConcatSstableIterator::for_test(
1337 vec![table_info.clone()],
1338 KeyRange::default(),
1339 sstable_store.clone(),
1340 );
1341 iter.rewind().await.unwrap();
1342 assert!(iter.is_valid());
1343 assert_eq!(iter.key(), key_1.to_ref());
1344
1345 iter.next().await.unwrap();
1346 assert!(iter.is_valid());
1347 assert_eq!(iter.key(), key_3.to_ref());
1348
1349 iter.next().await.unwrap();
1350 assert!(!iter.is_valid());
1351
1352 let mut iter =
1353 ConcatSstableIterator::for_test(vec![table_info], KeyRange::default(), sstable_store);
1354 iter.seek(key_2.to_ref()).await.unwrap();
1355 assert!(iter.is_valid());
1356 assert_eq!(iter.key(), key_3.to_ref());
1357 }
1358}