1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::ops::Range;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use fail::fail_point;
24use risingwave_common::catalog::TableId;
25use risingwave_hummock_sdk::KeyComparator;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30
31use crate::hummock::block_stream::BlockDataStream;
32use crate::hummock::compactor::task_progress::TaskProgress;
33use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::value::HummockValue;
36use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult, TableHolder};
37use crate::monitor::StoreLocalStatistic;
38
39const PROGRESS_KEY_INTERVAL: usize = 100;
40
41pub struct SstableStreamIterator {
45 sstable_store: SstableStoreRef,
46 sstable: TableHolder,
47 block_metas_range: Range<usize>,
49 block_stream: Option<BlockDataStream>,
51
52 block_iter: Option<BlockIterator>,
54
55 block_idx: usize,
57
58 stats_ptr: Arc<AtomicU64>,
60
61 sstable_info: SstableInfo,
63
64 compact_table_ids: HashSet<StateTableId>,
66 task_progress: Arc<TaskProgress>,
67 io_retry_times: usize,
68 max_io_retry_times: usize,
69
70 key_range_left: FullKey<Vec<u8>>,
72 key_range_right: FullKey<Vec<u8>>,
73 key_range_right_exclusive: bool,
74}
75
76impl SstableStreamIterator {
77 pub fn new(
92 sstable: TableHolder,
93 block_metas_range: Range<usize>,
94 sstable_info: SstableInfo,
95 stats: &StoreLocalStatistic,
96 task_progress: Arc<TaskProgress>,
97 sstable_store: SstableStoreRef,
98 max_io_retry_times: usize,
99 compact_table_ids: HashSet<StateTableId>,
100 ) -> Self {
101 let block_metas_range = {
104 let block_metas = &sstable.meta.block_metas[block_metas_range.clone()];
105 let inner_range = filter_block_metas(
106 block_metas,
107 &compact_table_ids,
108 sstable_info.key_range.clone(),
109 );
110 (block_metas_range.start + inner_range.start)
112 ..(block_metas_range.start + inner_range.end)
113 };
114
115 let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
116 let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
117 let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
118
119 Self {
120 block_stream: None,
121 block_iter: None,
122 sstable,
123 block_metas_range,
124 block_idx: 0,
125 stats_ptr: stats.remote_io_time.clone(),
126 compact_table_ids,
127 sstable_info,
128 sstable_store,
129 task_progress,
130 io_retry_times: 0,
131 max_io_retry_times,
132 key_range_left,
133 key_range_right,
134 key_range_right_exclusive,
135 }
136 }
137
138 #[inline]
140 fn block_metas(&self) -> &[BlockMeta] {
141 &self.sstable.meta.block_metas[self.block_metas_range.clone()]
142 }
143
144 #[inline]
146 fn block_count(&self) -> usize {
147 self.block_metas_range.len()
148 }
149
150 async fn create_stream(&mut self) -> HummockResult<()> {
151 let block_stream = self
152 .sstable_store
153 .get_stream_for_blocks(
154 self.sstable_info.object_id,
155 &self.block_metas()[self.block_idx..],
156 )
157 .instrument_await("stream_iter_get_stream".verbose())
158 .await?;
159 self.block_stream = Some(block_stream);
160 Ok(())
161 }
162
163 async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
164 while let Some(block_iter) = self.block_iter.as_mut() {
165 if self.compact_table_ids.contains(&block_iter.table_id()) {
166 return Ok(());
167 } else {
168 self.next_block().await?;
169 }
170 }
171 Ok(())
172 }
173
174 pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
179 self.next_block().await?;
181
182 let seek_key = if let Some(seek_key) = seek_key {
187 if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
188 Some(self.key_range_left.to_ref())
189 } else {
190 Some(seek_key)
191 }
192 } else {
193 Some(self.key_range_left.to_ref())
194 };
195
196 if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
197 block_iter.seek(seek_key);
198
199 if !block_iter.is_valid() {
200 self.next_block().await?;
202 }
203 }
204
205 self.prune_from_valid_block_iter().await?;
206 Ok(())
207 }
208
209 async fn next_block(&mut self) -> HummockResult<()> {
214 let now = Instant::now();
216 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
217 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
218 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
219 });
220 if self.block_idx < self.block_count() {
221 loop {
222 let ret = match &mut self.block_stream {
223 Some(block_stream) => block_stream.next_block().await,
224 None => {
225 self.create_stream().await?;
226 continue;
227 }
228 };
229 match ret {
230 Ok(Some(block)) => {
231 let mut block_iter =
232 BlockIterator::new(BlockHolder::from_owned_block(block));
233 block_iter.seek_to_first();
234 self.block_idx += 1;
235 self.block_iter = Some(block_iter);
236 return Ok(());
237 }
238 Ok(None) => break,
239 Err(e) => {
240 if !e.is_object_error() || !self.need_recreate_io_stream() {
241 return Err(e);
242 }
243 self.block_stream.take();
244 self.io_retry_times += 1;
245 fail_point!("create_stream_err");
246
247 tracing::warn!(
248 "retry create stream for sstable {} times, sstinfo={}",
249 self.io_retry_times,
250 self.sst_debug_info()
251 );
252 }
253 }
254 }
255 }
256 self.block_idx = self.block_count();
257 self.block_iter = None;
258
259 Ok(())
260 }
261
262 pub async fn next(&mut self) -> HummockResult<()> {
269 if !self.is_valid() {
270 return Ok(());
271 }
272
273 let block_iter = self.block_iter.as_mut().expect("no block iter");
274 block_iter.next();
275 if !block_iter.is_valid() {
276 self.next_block().await?;
277 self.prune_from_valid_block_iter().await?;
278 }
279
280 if !self.is_valid() {
281 return Ok(());
282 }
283
284 let key = self
286 .block_iter
287 .as_ref()
288 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
289 .key();
290
291 if self.exceed_key_range_right(key) {
292 self.block_iter = None;
293 }
294
295 Ok(())
296 }
297
298 pub fn key(&self) -> FullKey<&[u8]> {
299 let key = self
300 .block_iter
301 .as_ref()
302 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
303 .key();
304
305 assert!(
306 !self.exceed_key_range_left(key),
307 "key {:?} key_range_left {:?}",
308 key,
309 self.key_range_left.to_ref()
310 );
311
312 assert!(
313 !self.exceed_key_range_right(key),
314 "key {:?} key_range_right {:?} key_range_right_exclusive {}",
315 key,
316 self.key_range_right.to_ref(),
317 self.key_range_right_exclusive
318 );
319
320 key
321 }
322
323 pub fn value(&self) -> HummockValue<&[u8]> {
324 let raw_value = self
325 .block_iter
326 .as_ref()
327 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
328 .value();
329 HummockValue::from_slice(raw_value)
330 .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
331 }
332
333 pub fn is_valid(&self) -> bool {
334 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
336 }
337
338 fn sst_debug_info(&self) -> String {
339 format!(
340 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
341 self.sstable_info.object_id,
342 self.sstable_info.sst_id,
343 self.sstable_info.meta_offset,
344 self.sstable_info.table_ids
345 )
346 }
347
348 fn need_recreate_io_stream(&self) -> bool {
349 self.io_retry_times < self.max_io_retry_times
350 }
351
352 fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
353 key.cmp(&self.key_range_left.to_ref()).is_lt()
354 }
355
356 fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
357 if self.key_range_right_exclusive {
358 key.cmp(&self.key_range_right.to_ref()).is_ge()
359 } else {
360 key.cmp(&self.key_range_right.to_ref()).is_gt()
361 }
362 }
363}
364
365impl Drop for SstableStreamIterator {
366 fn drop(&mut self) {
367 self.task_progress.dec_num_pending_read_io()
368 }
369}
370
371pub struct ConcatSstableIterator {
374 key_range: KeyRange,
377
378 sstable_iter: Option<SstableStreamIterator>,
380
381 cur_idx: usize,
383
384 sstables: Vec<SstableInfo>,
386
387 compact_table_ids: HashSet<StateTableId>,
388
389 sstable_store: SstableStoreRef,
390
391 stats: StoreLocalStatistic,
392 task_progress: Arc<TaskProgress>,
393 max_io_retry_times: usize,
394}
395
396impl ConcatSstableIterator {
397 pub fn new(
401 compact_table_ids: Vec<StateTableId>,
402 sst_infos: Vec<SstableInfo>,
403 key_range: KeyRange,
404 sstable_store: SstableStoreRef,
405 task_progress: Arc<TaskProgress>,
406 max_io_retry_times: usize,
407 ) -> Self {
408 Self {
409 key_range,
410 sstable_iter: None,
411 cur_idx: 0,
412 sstables: sst_infos,
413 compact_table_ids: HashSet::from_iter(compact_table_ids),
414 sstable_store,
415 task_progress,
416 stats: StoreLocalStatistic::default(),
417 max_io_retry_times,
418 }
419 }
420
421 #[cfg(test)]
422 pub fn for_test(
423 existing_table_ids: Vec<impl Into<StateTableId>>,
424 sst_infos: Vec<SstableInfo>,
425 key_range: KeyRange,
426 sstable_store: SstableStoreRef,
427 ) -> Self {
428 Self::new(
429 existing_table_ids.into_iter().map(Into::into).collect(),
430 sst_infos,
431 key_range,
432 sstable_store,
433 Arc::new(TaskProgress::default()),
434 0,
435 )
436 }
437
438 async fn seek_idx(
440 &mut self,
441 idx: usize,
442 seek_key: Option<FullKey<&[u8]>>,
443 ) -> HummockResult<()> {
444 self.sstable_iter.take();
445 let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
446 {
447 (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
448 Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
449 Ordering::Greater => Some(seek_key),
450 },
451 (Some(seek_key), true) => Some(seek_key),
452 (None, true) => None,
453 (None, false) => Some(FullKey::decode(&self.key_range.left)),
454 };
455
456 self.cur_idx = idx;
457 while self.cur_idx < self.sstables.len() {
458 let table_info = &self.sstables[self.cur_idx];
459 let mut found = table_info
460 .table_ids
461 .iter()
462 .any(|table_id| self.compact_table_ids.contains(table_id));
463 if !found {
464 self.cur_idx += 1;
465 seek_key = None;
466 continue;
467 }
468 let sstable = self
469 .sstable_store
470 .sstable(table_info, &mut self.stats)
471 .instrument_await("stream_iter_sstable".verbose())
472 .await?;
473
474 let filter_key_range = match seek_key {
475 Some(seek_key) => {
476 KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
477 }
478 None => self.key_range.clone(),
479 };
480
481 let compact_sstable_table_ids = {
482 let sstable_table_ids = HashSet::from_iter(table_info.table_ids.iter().cloned());
484 sstable_table_ids
485 .intersection(&self.compact_table_ids)
486 .cloned()
487 .collect()
488 };
489
490 let block_metas_range = filter_block_metas(
491 &sstable.meta.block_metas,
492 &compact_sstable_table_ids,
493 filter_key_range,
494 );
495
496 if block_metas_range.is_empty() {
497 found = false;
498 } else {
499 self.task_progress.inc_num_pending_read_io();
500 let mut sstable_iter = SstableStreamIterator::new(
501 sstable,
502 block_metas_range,
503 table_info.clone(),
504 &self.stats,
505 self.task_progress.clone(),
506 self.sstable_store.clone(),
507 self.max_io_retry_times,
508 compact_sstable_table_ids,
509 );
510 sstable_iter.seek(seek_key).await?;
511
512 if sstable_iter.is_valid() {
513 self.sstable_iter = Some(sstable_iter);
514 } else {
515 found = false;
516 }
517 }
518
519 if found {
520 return Ok(());
521 } else {
522 self.cur_idx += 1;
523 seek_key = None;
524 }
525 }
526 Ok(())
527 }
528}
529
530impl HummockIterator for ConcatSstableIterator {
531 type Direction = Forward;
532
533 async fn next(&mut self) -> HummockResult<()> {
534 let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
535
536 sstable_iter.next().await?;
538 if sstable_iter.is_valid() {
539 Ok(())
540 } else {
541 self.seek_idx(self.cur_idx + 1, None).await?;
543 Ok(())
544 }
545 }
546
547 fn key(&self) -> FullKey<&[u8]> {
548 self.sstable_iter.as_ref().expect("no table iter").key()
549 }
550
551 fn value(&self) -> HummockValue<&[u8]> {
552 self.sstable_iter.as_ref().expect("no table iter").value()
553 }
554
555 fn is_valid(&self) -> bool {
556 self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
557 }
558
559 async fn rewind(&mut self) -> HummockResult<()> {
560 self.seek_idx(0, None).await
561 }
562
563 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
565 let seek_key = if self.key_range.left.is_empty() {
566 key
567 } else {
568 match key.cmp(&FullKey::decode(&self.key_range.left)) {
569 Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
570 Ordering::Greater => key,
571 }
572 };
573 let table_idx = self.sstables.partition_point(|table| {
574 let max_sst_key = &table.key_range.right;
581 FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
582 });
583
584 self.seek_idx(table_idx, Some(key)).await
585 }
586
587 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
588 stats.add(&self.stats)
589 }
590
591 fn value_meta(&self) -> ValueMeta {
592 let iter = self.sstable_iter.as_ref().expect("no table iter");
593 assert!(iter.block_idx >= 1);
596 let absolute_block_idx = iter.block_metas_range.start + iter.block_idx - 1;
598 ValueMeta {
599 object_id: Some(iter.sstable_info.object_id),
600 block_id: Some(absolute_block_idx as u64),
601 }
602 }
603}
604
605pub struct MonitoredCompactorIterator<I> {
606 inner: I,
607 task_progress: Arc<TaskProgress>,
608
609 processed_key_num: usize,
610}
611
612impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
613 pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
614 Self {
615 inner,
616 task_progress,
617 processed_key_num: 0,
618 }
619 }
620}
621
622impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
623 type Direction = Forward;
624
625 async fn next(&mut self) -> HummockResult<()> {
626 self.inner.next().await?;
627 self.processed_key_num += 1;
628
629 if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
630 self.task_progress
631 .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
632 }
633
634 Ok(())
635 }
636
637 fn key(&self) -> FullKey<&[u8]> {
638 self.inner.key()
639 }
640
641 fn value(&self) -> HummockValue<&[u8]> {
642 self.inner.value()
643 }
644
645 fn is_valid(&self) -> bool {
646 self.inner.is_valid()
647 }
648
649 async fn rewind(&mut self) -> HummockResult<()> {
650 self.processed_key_num = 0;
651 self.inner.rewind().await?;
652 Ok(())
653 }
654
655 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
656 self.processed_key_num = 0;
657 self.inner.seek(key).await?;
658 Ok(())
659 }
660
661 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
662 self.inner.collect_local_statistic(stats)
663 }
664
665 fn value_meta(&self) -> ValueMeta {
666 self.inner.value_meta()
667 }
668}
669
670pub(crate) fn filter_block_metas(
671 block_metas: &[BlockMeta],
672 compact_table_ids: &HashSet<TableId>,
673 key_range: KeyRange,
674) -> Range<usize> {
675 if block_metas.is_empty() {
676 return 0..0;
677 }
678
679 let mut start_index = if key_range.left.is_empty() {
680 0
681 } else {
682 block_metas
684 .partition_point(|block| {
685 KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
686 != Ordering::Less
687 })
688 .saturating_sub(1)
689 };
690
691 let mut end_index = if key_range.right.is_empty() {
692 block_metas.len()
693 } else {
694 let ret = block_metas.partition_point(|block| {
695 KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
696 != Ordering::Greater
697 });
698
699 if ret == 0 {
700 return 0..0;
702 }
703
704 ret
705 }
706 .saturating_sub(1);
707
708 while start_index <= end_index {
710 let start_block_table_id = block_metas[start_index].table_id();
711 if compact_table_ids.contains(&start_block_table_id) {
712 break;
713 }
714
715 let old_start_index = start_index;
717 let block_metas_to_search = &block_metas[start_index..=end_index];
718
719 start_index += block_metas_to_search
720 .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
721
722 if old_start_index == start_index {
723 break;
725 }
726 }
727
728 while start_index <= end_index {
729 let end_block_table_id = block_metas[end_index].table_id();
730 if compact_table_ids.contains(&end_block_table_id) {
731 break;
732 }
733
734 let old_end_index = end_index;
735 let block_metas_to_search = &block_metas[start_index..=end_index];
736
737 end_index = start_index
738 + block_metas_to_search
739 .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
740 .saturating_sub(1);
741
742 if end_index == old_end_index {
743 break;
745 }
746 }
747
748 if start_index > end_index {
749 return 0..0;
750 }
751
752 start_index..(end_index + 1)
753}
754
755#[cfg(test)]
756mod tests {
757 use std::cmp::Ordering;
758 use std::collections::HashSet;
759
760 use risingwave_common::catalog::TableId;
761 use risingwave_common::util::epoch::test_epoch;
762 use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
763 use risingwave_hummock_sdk::key_range::KeyRange;
764 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
765
766 use crate::hummock::BlockMeta;
767 use crate::hummock::compactor::ConcatSstableIterator;
768 use crate::hummock::iterator::test_utils::mock_sstable_store;
769 use crate::hummock::iterator::{HummockIterator, MergeIterator};
770 use crate::hummock::test_utils::{
771 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info,
772 gen_test_sstable_with_table_ids, test_key_of, test_value_of,
773 };
774 use crate::hummock::value::HummockValue;
775
776 #[tokio::test]
777 async fn test_concat_iterator() {
778 let sstable_store = mock_sstable_store().await;
779 let mut table_infos = vec![];
780 for object_id in 0..3 {
781 let start_index = object_id * TEST_KEYS_COUNT;
782 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
783 let table_info = gen_test_sstable_info(
784 default_builder_opt_for_test(),
785 object_id as u64,
786 (start_index..end_index)
787 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
788 sstable_store.clone(),
789 )
790 .await;
791 table_infos.push(table_info);
792 }
793 let start_index = 5000;
794 let end_index = 25000;
795
796 let kr = KeyRange::new(
797 test_key_of(start_index).encode().into(),
798 test_key_of(end_index).encode().into(),
799 );
800 let mut iter = ConcatSstableIterator::for_test(
801 vec![0],
802 table_infos.clone(),
803 kr.clone(),
804 sstable_store.clone(),
805 );
806 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
807
808 for idx in start_index..end_index {
809 let key = iter.key();
810 let val = iter.value();
811 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
812 assert_eq!(
813 val.into_user_value().unwrap(),
814 test_value_of(idx).as_slice()
815 );
816 iter.next().await.unwrap();
817 }
818
819 let kr = KeyRange::new(
821 test_key_of(30000).encode().into(),
822 test_key_of(40000).encode().into(),
823 );
824 let mut iter = ConcatSstableIterator::for_test(
825 vec![0],
826 table_infos.clone(),
827 kr.clone(),
828 sstable_store.clone(),
829 );
830 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
831 assert!(!iter.is_valid());
832 let kr = KeyRange::new(
833 test_key_of(start_index).encode().into(),
834 test_key_of(40000).encode().into(),
835 );
836 let mut iter = ConcatSstableIterator::for_test(
837 vec![0],
838 table_infos.clone(),
839 kr.clone(),
840 sstable_store.clone(),
841 );
842 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
843 for idx in start_index..30000 {
844 let key = iter.key();
845 let val = iter.value();
846 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
847 assert_eq!(
848 val.into_user_value().unwrap(),
849 test_value_of(idx).as_slice()
850 );
851 iter.next().await.unwrap();
852 }
853 assert!(!iter.is_valid());
854
855 let kr = KeyRange::new(
857 test_key_of(0).encode().into(),
858 test_key_of(40000).encode().into(),
859 );
860 let mut iter = ConcatSstableIterator::for_test(
861 vec![0],
862 table_infos.clone(),
863 kr.clone(),
864 sstable_store.clone(),
865 );
866 iter.seek(test_key_of(10000).to_ref()).await.unwrap();
867 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
868 iter.seek(test_key_of(10001).to_ref()).await.unwrap();
869 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
870 iter.seek(test_key_of(9999).to_ref()).await.unwrap();
871 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
872 iter.seek(test_key_of(1).to_ref()).await.unwrap();
873 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
874 iter.seek(test_key_of(29999).to_ref()).await.unwrap();
875 assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
876 iter.seek(test_key_of(30000).to_ref()).await.unwrap();
877 assert!(!iter.is_valid());
878
879 let kr = KeyRange::new(
881 test_key_of(6000).encode().into(),
882 test_key_of(16000).encode().into(),
883 );
884 let mut iter = ConcatSstableIterator::for_test(
885 vec![0],
886 table_infos.clone(),
887 kr.clone(),
888 sstable_store.clone(),
889 );
890 iter.seek(test_key_of(17000).to_ref()).await.unwrap();
891 assert!(!iter.is_valid());
892 iter.seek(test_key_of(1).to_ref()).await.unwrap();
893 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
894 }
895
896 #[tokio::test]
897 async fn test_concat_iterator_seek_idx() {
898 let sstable_store = mock_sstable_store().await;
899 let mut table_infos = vec![];
900 for object_id in 0..3 {
901 let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
902 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
903 let table_info = gen_test_sstable_info(
904 default_builder_opt_for_test(),
905 object_id as u64,
906 (start_index..end_index)
907 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
908 sstable_store.clone(),
909 )
910 .await;
911 table_infos.push(table_info);
912 }
913
914 let kr = KeyRange::new(
916 test_key_of(0).encode().into(),
917 test_key_of(40000).encode().into(),
918 );
919 let mut iter = ConcatSstableIterator::for_test(
920 vec![0],
921 table_infos.clone(),
922 kr.clone(),
923 sstable_store.clone(),
924 );
925 let sst = sstable_store
926 .sstable(&iter.sstables[0], &mut iter.stats)
927 .await
928 .unwrap();
929 let block_metas = &sst.meta.block_metas;
930 let block_1_smallest_key = block_metas[1].smallest_key.clone();
931 let block_2_smallest_key = block_metas[2].smallest_key.clone();
932 let seek_key = block_1_smallest_key.clone();
934 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
935 .await
936 .unwrap();
937 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
938 let seek_key = prev_full_key(block_1_smallest_key.as_slice());
941 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
942 .await
943 .unwrap();
944 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
945 iter.next().await.unwrap();
946 let block_1_second_key = iter.key().to_vec();
947 let seek_key = test_key_of(30001);
949 iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
950 .await
951 .unwrap();
952 assert!(!iter.is_valid());
953
954 let kr = KeyRange::new(
956 next_full_key(&block_1_smallest_key).into(),
957 prev_full_key(&block_2_smallest_key).into(),
958 );
959 let mut iter = ConcatSstableIterator::for_test(
960 vec![0],
961 table_infos.clone(),
962 kr.clone(),
963 sstable_store.clone(),
964 );
965 let seek_key = FullKey::decode(&block_2_smallest_key);
967 assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
968 iter.seek_idx(0, Some(seek_key)).await.unwrap();
969 assert!(!iter.is_valid());
970 let seek_key = test_key_of(0).encode();
972 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
973 .await
974 .unwrap();
975 assert!(iter.is_valid());
976 assert_eq!(iter.key(), block_1_second_key.to_ref());
977
978 iter.seek_idx(0, None).await.unwrap();
980 assert!(iter.is_valid());
981 assert_eq!(iter.key(), block_1_second_key.to_ref());
982 }
983
984 #[tokio::test]
985 async fn test_filter_block_metas() {
986 use crate::hummock::compactor::iterator::filter_block_metas;
987
988 {
989 let block_metas = Vec::default();
990
991 let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
992
993 assert!(ret.is_empty());
994 }
995
996 {
997 let block_metas = vec![
998 BlockMeta {
999 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1000 ..Default::default()
1001 },
1002 BlockMeta {
1003 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1004 ..Default::default()
1005 },
1006 BlockMeta {
1007 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1008 ..Default::default()
1009 },
1010 ];
1011
1012 let ret = filter_block_metas(
1013 &block_metas,
1014 &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
1015 KeyRange::default(),
1016 );
1017 let ret = &block_metas[ret];
1018
1019 assert_eq!(3, ret.len());
1020 assert_eq!(
1021 1,
1022 FullKey::decode(&ret[0].smallest_key)
1023 .user_key
1024 .table_id
1025 .as_raw_id()
1026 );
1027 assert_eq!(
1028 3,
1029 FullKey::decode(&ret[2].smallest_key)
1030 .user_key
1031 .table_id
1032 .as_raw_id()
1033 );
1034 }
1035
1036 {
1037 let block_metas = vec![
1038 BlockMeta {
1039 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1040 ..Default::default()
1041 },
1042 BlockMeta {
1043 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1044 ..Default::default()
1045 },
1046 BlockMeta {
1047 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1048 ..Default::default()
1049 },
1050 ];
1051
1052 let ret = filter_block_metas(
1053 &block_metas,
1054 &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1055 KeyRange::default(),
1056 );
1057 let ret = &block_metas[ret];
1058
1059 assert_eq!(2, ret.len());
1060 assert_eq!(
1061 2,
1062 FullKey::decode(&ret[0].smallest_key)
1063 .user_key
1064 .table_id
1065 .as_raw_id()
1066 );
1067 assert_eq!(
1068 3,
1069 FullKey::decode(&ret[1].smallest_key)
1070 .user_key
1071 .table_id
1072 .as_raw_id()
1073 );
1074 }
1075
1076 {
1077 let block_metas = vec![
1078 BlockMeta {
1079 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1080 ..Default::default()
1081 },
1082 BlockMeta {
1083 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1084 ..Default::default()
1085 },
1086 BlockMeta {
1087 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1088 ..Default::default()
1089 },
1090 ];
1091
1092 let ret = filter_block_metas(
1093 &block_metas,
1094 &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1095 KeyRange::default(),
1096 );
1097 let ret = &block_metas[ret];
1098
1099 assert_eq!(2, ret.len());
1100 assert_eq!(
1101 1,
1102 FullKey::decode(&ret[0].smallest_key)
1103 .user_key
1104 .table_id
1105 .as_raw_id()
1106 );
1107 assert_eq!(
1108 2,
1109 FullKey::decode(&ret[1].smallest_key)
1110 .user_key
1111 .table_id
1112 .as_raw_id()
1113 );
1114 }
1115
1116 {
1117 let block_metas = vec![
1118 BlockMeta {
1119 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1120 ..Default::default()
1121 },
1122 BlockMeta {
1123 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1124 ..Default::default()
1125 },
1126 BlockMeta {
1127 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1128 ..Default::default()
1129 },
1130 ];
1131 let ret = filter_block_metas(
1132 &block_metas,
1133 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1134 KeyRange::default(),
1135 );
1136 let ret = &block_metas[ret];
1137
1138 assert_eq!(1, ret.len());
1139 assert_eq!(
1140 2,
1141 FullKey::decode(&ret[0].smallest_key)
1142 .user_key
1143 .table_id
1144 .as_raw_id()
1145 );
1146 }
1147
1148 {
1149 let block_metas = vec![
1150 BlockMeta {
1151 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1152 ..Default::default()
1153 },
1154 BlockMeta {
1155 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1156 ..Default::default()
1157 },
1158 BlockMeta {
1159 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1160 ..Default::default()
1161 },
1162 BlockMeta {
1163 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1164 ..Default::default()
1165 },
1166 BlockMeta {
1167 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1168 ..Default::default()
1169 },
1170 ];
1171 let ret = filter_block_metas(
1172 &block_metas,
1173 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1174 KeyRange::default(),
1175 );
1176 let ret = &block_metas[ret];
1177
1178 assert_eq!(1, ret.len());
1179 assert_eq!(
1180 2,
1181 FullKey::decode(&ret[0].smallest_key)
1182 .user_key
1183 .table_id
1184 .as_raw_id()
1185 );
1186 }
1187
1188 {
1189 let block_metas = vec![
1190 BlockMeta {
1191 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1192 ..Default::default()
1193 },
1194 BlockMeta {
1195 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1196 ..Default::default()
1197 },
1198 BlockMeta {
1199 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1200 ..Default::default()
1201 },
1202 BlockMeta {
1203 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1204 ..Default::default()
1205 },
1206 BlockMeta {
1207 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1208 ..Default::default()
1209 },
1210 ];
1211
1212 let ret = filter_block_metas(
1213 &block_metas,
1214 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1215 KeyRange::default(),
1216 );
1217 let ret = &block_metas[ret];
1218
1219 assert_eq!(1, ret.len());
1220 assert_eq!(
1221 2,
1222 FullKey::decode(&ret[0].smallest_key)
1223 .user_key
1224 .table_id
1225 .as_raw_id()
1226 );
1227 }
1228
1229 {
1230 let block_metas = vec![
1231 BlockMeta {
1232 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1233 ..Default::default()
1234 },
1235 BlockMeta {
1236 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1237 ..Default::default()
1238 },
1239 BlockMeta {
1240 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1241 ..Default::default()
1242 },
1243 ];
1244
1245 let ret = filter_block_metas(
1246 &block_metas,
1247 &HashSet::from_iter(vec![1_u32.into(), 3_u32.into()].into_iter()),
1248 KeyRange::default(),
1249 );
1250 let ret = &block_metas[ret];
1251
1252 assert_eq!(3, ret.len());
1253 assert_eq!(
1254 1,
1255 FullKey::decode(&ret[0].smallest_key)
1256 .user_key
1257 .table_id
1258 .as_raw_id()
1259 );
1260 assert_eq!(
1261 2,
1262 FullKey::decode(&ret[1].smallest_key)
1263 .user_key
1264 .table_id
1265 .as_raw_id()
1266 );
1267 assert_eq!(
1268 3,
1269 FullKey::decode(&ret[2].smallest_key)
1270 .user_key
1271 .table_id
1272 .as_raw_id()
1273 );
1274 }
1275 }
1276
1277 #[tokio::test]
1278 async fn test_iterator_same_obj() {
1279 let sstable_store = mock_sstable_store().await;
1280
1281 let table_info = gen_test_sstable_info(
1282 default_builder_opt_for_test(),
1283 1_u64,
1284 (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1285 sstable_store.clone(),
1286 )
1287 .await;
1288
1289 let split_key = test_key_of(5000).encode();
1290 let sst_1: SstableInfo = SstableInfoInner {
1291 key_range: KeyRange {
1292 left: table_info.key_range.left.clone(),
1293 right: split_key.clone().into(),
1294 right_exclusive: true,
1295 },
1296 ..table_info.get_inner()
1297 }
1298 .into();
1299
1300 let total_key_count = sst_1.total_key_count;
1301 let sst_2: SstableInfo = SstableInfoInner {
1302 sst_id: sst_1.sst_id + 1,
1303 key_range: KeyRange {
1304 left: split_key.clone().into(),
1305 right: table_info.key_range.right.clone(),
1306 right_exclusive: table_info.key_range.right_exclusive,
1307 },
1308 ..table_info.get_inner()
1309 }
1310 .into();
1311
1312 {
1313 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1315
1316 let mut iter = ConcatSstableIterator::for_test(
1317 vec![0],
1318 vec![sst_1.clone(), sst_2.clone()],
1319 KeyRange::default(),
1320 sstable_store.clone(),
1321 );
1322
1323 iter.rewind().await.unwrap();
1324
1325 let mut key_count = 0;
1326 while iter.is_valid() {
1327 let is_new_user_key = full_key_tracker.observe(iter.key());
1328 assert!(is_new_user_key);
1329 key_count += 1;
1330 iter.next().await.unwrap();
1331 }
1332
1333 assert_eq!(total_key_count, key_count);
1334 }
1335
1336 {
1337 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1338 let concat_1 = ConcatSstableIterator::for_test(
1339 vec![0],
1340 vec![sst_1.clone()],
1341 KeyRange::default(),
1342 sstable_store.clone(),
1343 );
1344
1345 let concat_2 = ConcatSstableIterator::for_test(
1346 vec![0],
1347 vec![sst_2.clone()],
1348 KeyRange::default(),
1349 sstable_store.clone(),
1350 );
1351
1352 let mut key_count = 0;
1353 let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1354 iter.rewind().await.unwrap();
1355 while iter.is_valid() {
1356 full_key_tracker.observe(iter.key());
1357 key_count += 1;
1358 iter.next().await.unwrap();
1359 }
1360 assert_eq!(total_key_count, key_count);
1361 }
1362 }
1363
1364 #[tokio::test]
1365 async fn test_concat_iterator_skips_hole_table_blocks() {
1366 let sstable_store = mock_sstable_store().await;
1367
1368 let key_1 = FullKey::for_test(TableId::new(1), b"a".to_vec(), test_epoch(1));
1369 let key_2 = FullKey::for_test(TableId::new(2), b"b".to_vec(), test_epoch(1));
1370 let key_3 = FullKey::for_test(TableId::new(3), b"c".to_vec(), test_epoch(1));
1371 let kv_pairs = vec![
1372 (key_1.clone(), HummockValue::put(b"value-1".to_vec())),
1373 (key_2.clone(), HummockValue::put(b"value-2".to_vec())),
1374 (key_3.clone(), HummockValue::put(b"value-3".to_vec())),
1375 ];
1376
1377 let (_sstable, table_info) = gen_test_sstable_with_table_ids(
1378 default_builder_opt_for_test(),
1379 10,
1380 kv_pairs.into_iter(),
1381 sstable_store.clone(),
1382 vec![1, 2, 3],
1383 )
1384 .await;
1385
1386 let table_info: SstableInfo = SstableInfoInner {
1387 table_ids: vec![1.into(), 3.into()],
1388 ..table_info.get_inner()
1389 }
1390 .into();
1391
1392 let mut iter = ConcatSstableIterator::for_test(
1393 vec![1, 3],
1394 vec![table_info.clone()],
1395 KeyRange::default(),
1396 sstable_store.clone(),
1397 );
1398 iter.rewind().await.unwrap();
1399 assert!(iter.is_valid());
1400 assert_eq!(iter.key(), key_1.to_ref());
1401
1402 iter.next().await.unwrap();
1403 assert!(iter.is_valid());
1404 assert_eq!(iter.key(), key_3.to_ref());
1405
1406 iter.next().await.unwrap();
1407 assert!(!iter.is_valid());
1408
1409 let mut iter = ConcatSstableIterator::for_test(
1410 vec![1, 3],
1411 vec![table_info],
1412 KeyRange::default(),
1413 sstable_store,
1414 );
1415 iter.seek(key_2.to_ref()).await.unwrap();
1416 assert!(iter.is_valid());
1417 assert_eq!(iter.key(), key_3.to_ref());
1418 }
1419}