1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::ops::Range;
18use std::sync::atomic::AtomicU64;
19use std::sync::{Arc, atomic};
20use std::time::Instant;
21
22use await_tree::{InstrumentAwait, SpanExt};
23use fail::fail_point;
24use risingwave_common::catalog::TableId;
25use risingwave_hummock_sdk::KeyComparator;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30
31use crate::hummock::block_stream::BlockDataStream;
32use crate::hummock::compactor::task_progress::TaskProgress;
33use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::value::HummockValue;
36use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult, TableHolder};
37use crate::monitor::StoreLocalStatistic;
38
39const PROGRESS_KEY_INTERVAL: usize = 100;
40
41pub struct SstableStreamIterator {
45 sstable_store: SstableStoreRef,
46 sstable: TableHolder,
47 block_metas_range: Range<usize>,
49 block_stream: Option<BlockDataStream>,
51
52 block_iter: Option<BlockIterator>,
54
55 block_idx: usize,
57
58 stats_ptr: Arc<AtomicU64>,
60
61 sstable_info: SstableInfo,
63
64 sstable_table_ids: HashSet<StateTableId>,
66 task_progress: Arc<TaskProgress>,
67 io_retry_times: usize,
68 max_io_retry_times: usize,
69
70 key_range_left: FullKey<Vec<u8>>,
72 key_range_right: FullKey<Vec<u8>>,
73 key_range_right_exclusive: bool,
74}
75
76impl SstableStreamIterator {
77 pub fn new(
92 sstable: TableHolder,
93 block_metas_range: Range<usize>,
94 sstable_info: SstableInfo,
95 stats: &StoreLocalStatistic,
96 task_progress: Arc<TaskProgress>,
97 sstable_store: SstableStoreRef,
98 max_io_retry_times: usize,
99 ) -> Self {
100 let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());
101
102 let block_metas_range = {
105 let block_metas = &sstable.meta.block_metas[block_metas_range.clone()];
106 let inner_range = filter_block_metas(
107 block_metas,
108 &sstable_table_ids,
109 sstable_info.key_range.clone(),
110 );
111 (block_metas_range.start + inner_range.start)
113 ..(block_metas_range.start + inner_range.end)
114 };
115
116 let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
117 let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
118 let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
119
120 Self {
121 block_stream: None,
122 block_iter: None,
123 sstable,
124 block_metas_range,
125 block_idx: 0,
126 stats_ptr: stats.remote_io_time.clone(),
127 sstable_table_ids,
128 sstable_info,
129 sstable_store,
130 task_progress,
131 io_retry_times: 0,
132 max_io_retry_times,
133 key_range_left,
134 key_range_right,
135 key_range_right_exclusive,
136 }
137 }
138
139 #[inline]
141 fn block_metas(&self) -> &[BlockMeta] {
142 &self.sstable.meta.block_metas[self.block_metas_range.clone()]
143 }
144
145 #[inline]
147 fn block_count(&self) -> usize {
148 self.block_metas_range.len()
149 }
150
151 async fn create_stream(&mut self) -> HummockResult<()> {
152 let block_stream = self
153 .sstable_store
154 .get_stream_for_blocks(
155 self.sstable_info.object_id,
156 &self.block_metas()[self.block_idx..],
157 )
158 .instrument_await("stream_iter_get_stream".verbose())
159 .await?;
160 self.block_stream = Some(block_stream);
161 Ok(())
162 }
163
164 async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
165 while let Some(block_iter) = self.block_iter.as_mut() {
166 if self.sstable_table_ids.contains(&block_iter.table_id()) {
167 return Ok(());
168 } else {
169 self.next_block().await?;
170 }
171 }
172 Ok(())
173 }
174
175 pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
180 self.next_block().await?;
182
183 let seek_key = if let Some(seek_key) = seek_key {
188 if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
189 Some(self.key_range_left.to_ref())
190 } else {
191 Some(seek_key)
192 }
193 } else {
194 Some(self.key_range_left.to_ref())
195 };
196
197 if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
198 block_iter.seek(seek_key);
199
200 if !block_iter.is_valid() {
201 self.next_block().await?;
203 }
204 }
205
206 self.prune_from_valid_block_iter().await?;
207 Ok(())
208 }
209
210 async fn next_block(&mut self) -> HummockResult<()> {
215 let now = Instant::now();
217 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
218 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
219 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
220 });
221 if self.block_idx < self.block_count() {
222 loop {
223 let ret = match &mut self.block_stream {
224 Some(block_stream) => block_stream.next_block().await,
225 None => {
226 self.create_stream().await?;
227 continue;
228 }
229 };
230 match ret {
231 Ok(Some(block)) => {
232 let mut block_iter =
233 BlockIterator::new(BlockHolder::from_owned_block(block));
234 block_iter.seek_to_first();
235 self.block_idx += 1;
236 self.block_iter = Some(block_iter);
237 return Ok(());
238 }
239 Ok(None) => break,
240 Err(e) => {
241 if !e.is_object_error() || !self.need_recreate_io_stream() {
242 return Err(e);
243 }
244 self.block_stream.take();
245 self.io_retry_times += 1;
246 fail_point!("create_stream_err");
247
248 tracing::warn!(
249 "retry create stream for sstable {} times, sstinfo={}",
250 self.io_retry_times,
251 self.sst_debug_info()
252 );
253 }
254 }
255 }
256 }
257 self.block_idx = self.block_count();
258 self.block_iter = None;
259
260 Ok(())
261 }
262
263 pub async fn next(&mut self) -> HummockResult<()> {
270 if !self.is_valid() {
271 return Ok(());
272 }
273
274 let block_iter = self.block_iter.as_mut().expect("no block iter");
275 block_iter.next();
276 if !block_iter.is_valid() {
277 self.next_block().await?;
278 self.prune_from_valid_block_iter().await?;
279 }
280
281 if !self.is_valid() {
282 return Ok(());
283 }
284
285 let key = self
287 .block_iter
288 .as_ref()
289 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
290 .key();
291
292 if self.exceed_key_range_right(key) {
293 self.block_iter = None;
294 }
295
296 Ok(())
297 }
298
299 pub fn key(&self) -> FullKey<&[u8]> {
300 let key = self
301 .block_iter
302 .as_ref()
303 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
304 .key();
305
306 assert!(
307 !self.exceed_key_range_left(key),
308 "key {:?} key_range_left {:?}",
309 key,
310 self.key_range_left.to_ref()
311 );
312
313 assert!(
314 !self.exceed_key_range_right(key),
315 "key {:?} key_range_right {:?} key_range_right_exclusive {}",
316 key,
317 self.key_range_right.to_ref(),
318 self.key_range_right_exclusive
319 );
320
321 key
322 }
323
324 pub fn value(&self) -> HummockValue<&[u8]> {
325 let raw_value = self
326 .block_iter
327 .as_ref()
328 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
329 .value();
330 HummockValue::from_slice(raw_value)
331 .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
332 }
333
334 pub fn is_valid(&self) -> bool {
335 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
337 }
338
339 fn sst_debug_info(&self) -> String {
340 format!(
341 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
342 self.sstable_info.object_id,
343 self.sstable_info.sst_id,
344 self.sstable_info.meta_offset,
345 self.sstable_info.table_ids
346 )
347 }
348
349 fn need_recreate_io_stream(&self) -> bool {
350 self.io_retry_times < self.max_io_retry_times
351 }
352
353 fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
354 key.cmp(&self.key_range_left.to_ref()).is_lt()
355 }
356
357 fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
358 if self.key_range_right_exclusive {
359 key.cmp(&self.key_range_right.to_ref()).is_ge()
360 } else {
361 key.cmp(&self.key_range_right.to_ref()).is_gt()
362 }
363 }
364}
365
366impl Drop for SstableStreamIterator {
367 fn drop(&mut self) {
368 self.task_progress.dec_num_pending_read_io()
369 }
370}
371
372pub struct ConcatSstableIterator {
375 key_range: KeyRange,
378
379 sstable_iter: Option<SstableStreamIterator>,
381
382 cur_idx: usize,
384
385 sstables: Vec<SstableInfo>,
387
388 existing_table_ids: HashSet<StateTableId>,
389
390 sstable_store: SstableStoreRef,
391
392 stats: StoreLocalStatistic,
393 task_progress: Arc<TaskProgress>,
394 max_io_retry_times: usize,
395}
396
397impl ConcatSstableIterator {
398 pub fn new(
402 existing_table_ids: Vec<StateTableId>,
403 sst_infos: Vec<SstableInfo>,
404 key_range: KeyRange,
405 sstable_store: SstableStoreRef,
406 task_progress: Arc<TaskProgress>,
407 max_io_retry_times: usize,
408 ) -> Self {
409 Self {
410 key_range,
411 sstable_iter: None,
412 cur_idx: 0,
413 sstables: sst_infos,
414 existing_table_ids: HashSet::from_iter(existing_table_ids),
415 sstable_store,
416 task_progress,
417 stats: StoreLocalStatistic::default(),
418 max_io_retry_times,
419 }
420 }
421
422 #[cfg(test)]
423 pub fn for_test(
424 existing_table_ids: Vec<impl Into<StateTableId>>,
425 sst_infos: Vec<SstableInfo>,
426 key_range: KeyRange,
427 sstable_store: SstableStoreRef,
428 ) -> Self {
429 Self::new(
430 existing_table_ids.into_iter().map(Into::into).collect(),
431 sst_infos,
432 key_range,
433 sstable_store,
434 Arc::new(TaskProgress::default()),
435 0,
436 )
437 }
438
439 async fn seek_idx(
441 &mut self,
442 idx: usize,
443 seek_key: Option<FullKey<&[u8]>>,
444 ) -> HummockResult<()> {
445 self.sstable_iter.take();
446 let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
447 {
448 (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
449 Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
450 Ordering::Greater => Some(seek_key),
451 },
452 (Some(seek_key), true) => Some(seek_key),
453 (None, true) => None,
454 (None, false) => Some(FullKey::decode(&self.key_range.left)),
455 };
456
457 self.cur_idx = idx;
458 while self.cur_idx < self.sstables.len() {
459 let table_info = &self.sstables[self.cur_idx];
460 let mut found = table_info
461 .table_ids
462 .iter()
463 .any(|table_id| self.existing_table_ids.contains(table_id));
464 if !found {
465 self.cur_idx += 1;
466 seek_key = None;
467 continue;
468 }
469 let sstable = self
470 .sstable_store
471 .sstable(table_info, &mut self.stats)
472 .instrument_await("stream_iter_sstable".verbose())
473 .await?;
474
475 let filter_key_range = match seek_key {
476 Some(seek_key) => {
477 KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
478 }
479 None => self.key_range.clone(),
480 };
481
482 let block_metas_range = filter_block_metas(
483 &sstable.meta.block_metas,
484 &self.existing_table_ids,
485 filter_key_range,
486 );
487
488 if block_metas_range.is_empty() {
489 found = false;
490 } else {
491 self.task_progress.inc_num_pending_read_io();
492 let mut sstable_iter = SstableStreamIterator::new(
493 sstable,
494 block_metas_range,
495 table_info.clone(),
496 &self.stats,
497 self.task_progress.clone(),
498 self.sstable_store.clone(),
499 self.max_io_retry_times,
500 );
501 sstable_iter.seek(seek_key).await?;
502
503 if sstable_iter.is_valid() {
504 self.sstable_iter = Some(sstable_iter);
505 } else {
506 found = false;
507 }
508 }
509
510 if found {
511 return Ok(());
512 } else {
513 self.cur_idx += 1;
514 seek_key = None;
515 }
516 }
517 Ok(())
518 }
519}
520
521impl HummockIterator for ConcatSstableIterator {
522 type Direction = Forward;
523
524 async fn next(&mut self) -> HummockResult<()> {
525 let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
526
527 sstable_iter.next().await?;
529 if sstable_iter.is_valid() {
530 Ok(())
531 } else {
532 self.seek_idx(self.cur_idx + 1, None).await?;
534 Ok(())
535 }
536 }
537
538 fn key(&self) -> FullKey<&[u8]> {
539 self.sstable_iter.as_ref().expect("no table iter").key()
540 }
541
542 fn value(&self) -> HummockValue<&[u8]> {
543 self.sstable_iter.as_ref().expect("no table iter").value()
544 }
545
546 fn is_valid(&self) -> bool {
547 self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
548 }
549
550 async fn rewind(&mut self) -> HummockResult<()> {
551 self.seek_idx(0, None).await
552 }
553
554 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
556 let seek_key = if self.key_range.left.is_empty() {
557 key
558 } else {
559 match key.cmp(&FullKey::decode(&self.key_range.left)) {
560 Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
561 Ordering::Greater => key,
562 }
563 };
564 let table_idx = self.sstables.partition_point(|table| {
565 let max_sst_key = &table.key_range.right;
572 FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
573 });
574
575 self.seek_idx(table_idx, Some(key)).await
576 }
577
578 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
579 stats.add(&self.stats)
580 }
581
582 fn value_meta(&self) -> ValueMeta {
583 let iter = self.sstable_iter.as_ref().expect("no table iter");
584 assert!(iter.block_idx >= 1);
587 let absolute_block_idx = iter.block_metas_range.start + iter.block_idx - 1;
589 ValueMeta {
590 object_id: Some(iter.sstable_info.object_id),
591 block_id: Some(absolute_block_idx as u64),
592 }
593 }
594}
595
596pub struct MonitoredCompactorIterator<I> {
597 inner: I,
598 task_progress: Arc<TaskProgress>,
599
600 processed_key_num: usize,
601}
602
603impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
604 pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
605 Self {
606 inner,
607 task_progress,
608 processed_key_num: 0,
609 }
610 }
611}
612
613impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
614 type Direction = Forward;
615
616 async fn next(&mut self) -> HummockResult<()> {
617 self.inner.next().await?;
618 self.processed_key_num += 1;
619
620 if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
621 self.task_progress
622 .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
623 }
624
625 Ok(())
626 }
627
628 fn key(&self) -> FullKey<&[u8]> {
629 self.inner.key()
630 }
631
632 fn value(&self) -> HummockValue<&[u8]> {
633 self.inner.value()
634 }
635
636 fn is_valid(&self) -> bool {
637 self.inner.is_valid()
638 }
639
640 async fn rewind(&mut self) -> HummockResult<()> {
641 self.processed_key_num = 0;
642 self.inner.rewind().await?;
643 Ok(())
644 }
645
646 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
647 self.processed_key_num = 0;
648 self.inner.seek(key).await?;
649 Ok(())
650 }
651
652 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
653 self.inner.collect_local_statistic(stats)
654 }
655
656 fn value_meta(&self) -> ValueMeta {
657 self.inner.value_meta()
658 }
659}
660
661pub(crate) fn filter_block_metas(
662 block_metas: &[BlockMeta],
663 existing_table_ids: &HashSet<TableId>,
664 key_range: KeyRange,
665) -> Range<usize> {
666 if block_metas.is_empty() {
667 return 0..0;
668 }
669
670 let mut start_index = if key_range.left.is_empty() {
671 0
672 } else {
673 block_metas
675 .partition_point(|block| {
676 KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
677 != Ordering::Less
678 })
679 .saturating_sub(1)
680 };
681
682 let mut end_index = if key_range.right.is_empty() {
683 block_metas.len()
684 } else {
685 let ret = block_metas.partition_point(|block| {
686 KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
687 != Ordering::Greater
688 });
689
690 if ret == 0 {
691 return 0..0;
693 }
694
695 ret
696 }
697 .saturating_sub(1);
698
699 while start_index <= end_index {
701 let start_block_table_id = block_metas[start_index].table_id();
702 if existing_table_ids.contains(&start_block_table_id) {
703 break;
704 }
705
706 let old_start_index = start_index;
708 let block_metas_to_search = &block_metas[start_index..=end_index];
709
710 start_index += block_metas_to_search
711 .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
712
713 if old_start_index == start_index {
714 break;
716 }
717 }
718
719 while start_index <= end_index {
720 let end_block_table_id = block_metas[end_index].table_id();
721 if existing_table_ids.contains(&end_block_table_id) {
722 break;
723 }
724
725 let old_end_index = end_index;
726 let block_metas_to_search = &block_metas[start_index..=end_index];
727
728 end_index = start_index
729 + block_metas_to_search
730 .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
731 .saturating_sub(1);
732
733 if end_index == old_end_index {
734 break;
736 }
737 }
738
739 if start_index > end_index {
740 return 0..0;
741 }
742
743 start_index..(end_index + 1)
744}
745
746#[cfg(test)]
747mod tests {
748 use std::cmp::Ordering;
749 use std::collections::HashSet;
750
751 use risingwave_common::catalog::TableId;
752 use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
753 use risingwave_hummock_sdk::key_range::KeyRange;
754 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
755
756 use crate::hummock::BlockMeta;
757 use crate::hummock::compactor::ConcatSstableIterator;
758 use crate::hummock::iterator::test_utils::mock_sstable_store;
759 use crate::hummock::iterator::{HummockIterator, MergeIterator};
760 use crate::hummock::test_utils::{
761 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info, test_key_of,
762 test_value_of,
763 };
764 use crate::hummock::value::HummockValue;
765
766 #[tokio::test]
767 async fn test_concat_iterator() {
768 let sstable_store = mock_sstable_store().await;
769 let mut table_infos = vec![];
770 for object_id in 0..3 {
771 let start_index = object_id * TEST_KEYS_COUNT;
772 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
773 let table_info = gen_test_sstable_info(
774 default_builder_opt_for_test(),
775 object_id as u64,
776 (start_index..end_index)
777 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
778 sstable_store.clone(),
779 )
780 .await;
781 table_infos.push(table_info);
782 }
783 let start_index = 5000;
784 let end_index = 25000;
785
786 let kr = KeyRange::new(
787 test_key_of(start_index).encode().into(),
788 test_key_of(end_index).encode().into(),
789 );
790 let mut iter = ConcatSstableIterator::for_test(
791 vec![0],
792 table_infos.clone(),
793 kr.clone(),
794 sstable_store.clone(),
795 );
796 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
797
798 for idx in start_index..end_index {
799 let key = iter.key();
800 let val = iter.value();
801 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
802 assert_eq!(
803 val.into_user_value().unwrap(),
804 test_value_of(idx).as_slice()
805 );
806 iter.next().await.unwrap();
807 }
808
809 let kr = KeyRange::new(
811 test_key_of(30000).encode().into(),
812 test_key_of(40000).encode().into(),
813 );
814 let mut iter = ConcatSstableIterator::for_test(
815 vec![0],
816 table_infos.clone(),
817 kr.clone(),
818 sstable_store.clone(),
819 );
820 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
821 assert!(!iter.is_valid());
822 let kr = KeyRange::new(
823 test_key_of(start_index).encode().into(),
824 test_key_of(40000).encode().into(),
825 );
826 let mut iter = ConcatSstableIterator::for_test(
827 vec![0],
828 table_infos.clone(),
829 kr.clone(),
830 sstable_store.clone(),
831 );
832 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
833 for idx in start_index..30000 {
834 let key = iter.key();
835 let val = iter.value();
836 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
837 assert_eq!(
838 val.into_user_value().unwrap(),
839 test_value_of(idx).as_slice()
840 );
841 iter.next().await.unwrap();
842 }
843 assert!(!iter.is_valid());
844
845 let kr = KeyRange::new(
847 test_key_of(0).encode().into(),
848 test_key_of(40000).encode().into(),
849 );
850 let mut iter = ConcatSstableIterator::for_test(
851 vec![0],
852 table_infos.clone(),
853 kr.clone(),
854 sstable_store.clone(),
855 );
856 iter.seek(test_key_of(10000).to_ref()).await.unwrap();
857 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
858 iter.seek(test_key_of(10001).to_ref()).await.unwrap();
859 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
860 iter.seek(test_key_of(9999).to_ref()).await.unwrap();
861 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
862 iter.seek(test_key_of(1).to_ref()).await.unwrap();
863 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
864 iter.seek(test_key_of(29999).to_ref()).await.unwrap();
865 assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
866 iter.seek(test_key_of(30000).to_ref()).await.unwrap();
867 assert!(!iter.is_valid());
868
869 let kr = KeyRange::new(
871 test_key_of(6000).encode().into(),
872 test_key_of(16000).encode().into(),
873 );
874 let mut iter = ConcatSstableIterator::for_test(
875 vec![0],
876 table_infos.clone(),
877 kr.clone(),
878 sstable_store.clone(),
879 );
880 iter.seek(test_key_of(17000).to_ref()).await.unwrap();
881 assert!(!iter.is_valid());
882 iter.seek(test_key_of(1).to_ref()).await.unwrap();
883 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
884 }
885
886 #[tokio::test]
887 async fn test_concat_iterator_seek_idx() {
888 let sstable_store = mock_sstable_store().await;
889 let mut table_infos = vec![];
890 for object_id in 0..3 {
891 let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
892 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
893 let table_info = gen_test_sstable_info(
894 default_builder_opt_for_test(),
895 object_id as u64,
896 (start_index..end_index)
897 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
898 sstable_store.clone(),
899 )
900 .await;
901 table_infos.push(table_info);
902 }
903
904 let kr = KeyRange::new(
906 test_key_of(0).encode().into(),
907 test_key_of(40000).encode().into(),
908 );
909 let mut iter = ConcatSstableIterator::for_test(
910 vec![0],
911 table_infos.clone(),
912 kr.clone(),
913 sstable_store.clone(),
914 );
915 let sst = sstable_store
916 .sstable(&iter.sstables[0], &mut iter.stats)
917 .await
918 .unwrap();
919 let block_metas = &sst.meta.block_metas;
920 let block_1_smallest_key = block_metas[1].smallest_key.clone();
921 let block_2_smallest_key = block_metas[2].smallest_key.clone();
922 let seek_key = block_1_smallest_key.clone();
924 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
925 .await
926 .unwrap();
927 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
928 let seek_key = prev_full_key(block_1_smallest_key.as_slice());
931 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
932 .await
933 .unwrap();
934 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
935 iter.next().await.unwrap();
936 let block_1_second_key = iter.key().to_vec();
937 let seek_key = test_key_of(30001);
939 iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
940 .await
941 .unwrap();
942 assert!(!iter.is_valid());
943
944 let kr = KeyRange::new(
946 next_full_key(&block_1_smallest_key).into(),
947 prev_full_key(&block_2_smallest_key).into(),
948 );
949 let mut iter = ConcatSstableIterator::for_test(
950 vec![0],
951 table_infos.clone(),
952 kr.clone(),
953 sstable_store.clone(),
954 );
955 let seek_key = FullKey::decode(&block_2_smallest_key);
957 assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
958 iter.seek_idx(0, Some(seek_key)).await.unwrap();
959 assert!(!iter.is_valid());
960 let seek_key = test_key_of(0).encode();
962 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
963 .await
964 .unwrap();
965 assert!(iter.is_valid());
966 assert_eq!(iter.key(), block_1_second_key.to_ref());
967
968 iter.seek_idx(0, None).await.unwrap();
970 assert!(iter.is_valid());
971 assert_eq!(iter.key(), block_1_second_key.to_ref());
972 }
973
974 #[tokio::test]
975 async fn test_filter_block_metas() {
976 use crate::hummock::compactor::iterator::filter_block_metas;
977
978 {
979 let block_metas = Vec::default();
980
981 let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
982
983 assert!(ret.is_empty());
984 }
985
986 {
987 let block_metas = vec![
988 BlockMeta {
989 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
990 ..Default::default()
991 },
992 BlockMeta {
993 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
994 ..Default::default()
995 },
996 BlockMeta {
997 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
998 ..Default::default()
999 },
1000 ];
1001
1002 let ret = filter_block_metas(
1003 &block_metas,
1004 &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
1005 KeyRange::default(),
1006 );
1007 let ret = &block_metas[ret];
1008
1009 assert_eq!(3, ret.len());
1010 assert_eq!(
1011 1,
1012 FullKey::decode(&ret[0].smallest_key)
1013 .user_key
1014 .table_id
1015 .as_raw_id()
1016 );
1017 assert_eq!(
1018 3,
1019 FullKey::decode(&ret[2].smallest_key)
1020 .user_key
1021 .table_id
1022 .as_raw_id()
1023 );
1024 }
1025
1026 {
1027 let block_metas = vec![
1028 BlockMeta {
1029 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1030 ..Default::default()
1031 },
1032 BlockMeta {
1033 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1034 ..Default::default()
1035 },
1036 BlockMeta {
1037 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1038 ..Default::default()
1039 },
1040 ];
1041
1042 let ret = filter_block_metas(
1043 &block_metas,
1044 &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1045 KeyRange::default(),
1046 );
1047 let ret = &block_metas[ret];
1048
1049 assert_eq!(2, ret.len());
1050 assert_eq!(
1051 2,
1052 FullKey::decode(&ret[0].smallest_key)
1053 .user_key
1054 .table_id
1055 .as_raw_id()
1056 );
1057 assert_eq!(
1058 3,
1059 FullKey::decode(&ret[1].smallest_key)
1060 .user_key
1061 .table_id
1062 .as_raw_id()
1063 );
1064 }
1065
1066 {
1067 let block_metas = vec![
1068 BlockMeta {
1069 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1070 ..Default::default()
1071 },
1072 BlockMeta {
1073 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1074 ..Default::default()
1075 },
1076 BlockMeta {
1077 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1078 ..Default::default()
1079 },
1080 ];
1081
1082 let ret = filter_block_metas(
1083 &block_metas,
1084 &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1085 KeyRange::default(),
1086 );
1087 let ret = &block_metas[ret];
1088
1089 assert_eq!(2, ret.len());
1090 assert_eq!(
1091 1,
1092 FullKey::decode(&ret[0].smallest_key)
1093 .user_key
1094 .table_id
1095 .as_raw_id()
1096 );
1097 assert_eq!(
1098 2,
1099 FullKey::decode(&ret[1].smallest_key)
1100 .user_key
1101 .table_id
1102 .as_raw_id()
1103 );
1104 }
1105
1106 {
1107 let block_metas = vec![
1108 BlockMeta {
1109 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1110 ..Default::default()
1111 },
1112 BlockMeta {
1113 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1114 ..Default::default()
1115 },
1116 BlockMeta {
1117 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1118 ..Default::default()
1119 },
1120 ];
1121 let ret = filter_block_metas(
1122 &block_metas,
1123 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1124 KeyRange::default(),
1125 );
1126 let ret = &block_metas[ret];
1127
1128 assert_eq!(1, ret.len());
1129 assert_eq!(
1130 2,
1131 FullKey::decode(&ret[0].smallest_key)
1132 .user_key
1133 .table_id
1134 .as_raw_id()
1135 );
1136 }
1137
1138 {
1139 let block_metas = vec![
1140 BlockMeta {
1141 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1142 ..Default::default()
1143 },
1144 BlockMeta {
1145 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1146 ..Default::default()
1147 },
1148 BlockMeta {
1149 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1150 ..Default::default()
1151 },
1152 BlockMeta {
1153 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1154 ..Default::default()
1155 },
1156 BlockMeta {
1157 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1158 ..Default::default()
1159 },
1160 ];
1161 let ret = filter_block_metas(
1162 &block_metas,
1163 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1164 KeyRange::default(),
1165 );
1166 let ret = &block_metas[ret];
1167
1168 assert_eq!(1, ret.len());
1169 assert_eq!(
1170 2,
1171 FullKey::decode(&ret[0].smallest_key)
1172 .user_key
1173 .table_id
1174 .as_raw_id()
1175 );
1176 }
1177
1178 {
1179 let block_metas = vec![
1180 BlockMeta {
1181 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1182 ..Default::default()
1183 },
1184 BlockMeta {
1185 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1186 ..Default::default()
1187 },
1188 BlockMeta {
1189 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1190 ..Default::default()
1191 },
1192 BlockMeta {
1193 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1194 ..Default::default()
1195 },
1196 BlockMeta {
1197 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1198 ..Default::default()
1199 },
1200 ];
1201
1202 let ret = filter_block_metas(
1203 &block_metas,
1204 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1205 KeyRange::default(),
1206 );
1207 let ret = &block_metas[ret];
1208
1209 assert_eq!(1, ret.len());
1210 assert_eq!(
1211 2,
1212 FullKey::decode(&ret[0].smallest_key)
1213 .user_key
1214 .table_id
1215 .as_raw_id()
1216 );
1217 }
1218 }
1219
1220 #[tokio::test]
1221 async fn test_iterator_same_obj() {
1222 let sstable_store = mock_sstable_store().await;
1223
1224 let table_info = gen_test_sstable_info(
1225 default_builder_opt_for_test(),
1226 1_u64,
1227 (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1228 sstable_store.clone(),
1229 )
1230 .await;
1231
1232 let split_key = test_key_of(5000).encode();
1233 let sst_1: SstableInfo = SstableInfoInner {
1234 key_range: KeyRange {
1235 left: table_info.key_range.left.clone(),
1236 right: split_key.clone().into(),
1237 right_exclusive: true,
1238 },
1239 ..table_info.get_inner()
1240 }
1241 .into();
1242
1243 let total_key_count = sst_1.total_key_count;
1244 let sst_2: SstableInfo = SstableInfoInner {
1245 sst_id: sst_1.sst_id + 1,
1246 key_range: KeyRange {
1247 left: split_key.clone().into(),
1248 right: table_info.key_range.right.clone(),
1249 right_exclusive: table_info.key_range.right_exclusive,
1250 },
1251 ..table_info.get_inner()
1252 }
1253 .into();
1254
1255 {
1256 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1258
1259 let mut iter = ConcatSstableIterator::for_test(
1260 vec![0],
1261 vec![sst_1.clone(), sst_2.clone()],
1262 KeyRange::default(),
1263 sstable_store.clone(),
1264 );
1265
1266 iter.rewind().await.unwrap();
1267
1268 let mut key_count = 0;
1269 while iter.is_valid() {
1270 let is_new_user_key = full_key_tracker.observe(iter.key());
1271 assert!(is_new_user_key);
1272 key_count += 1;
1273 iter.next().await.unwrap();
1274 }
1275
1276 assert_eq!(total_key_count, key_count);
1277 }
1278
1279 {
1280 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1281 let concat_1 = ConcatSstableIterator::for_test(
1282 vec![0],
1283 vec![sst_1.clone()],
1284 KeyRange::default(),
1285 sstable_store.clone(),
1286 );
1287
1288 let concat_2 = ConcatSstableIterator::for_test(
1289 vec![0],
1290 vec![sst_2.clone()],
1291 KeyRange::default(),
1292 sstable_store.clone(),
1293 );
1294
1295 let mut key_count = 0;
1296 let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1297 iter.rewind().await.unwrap();
1298 while iter.is_valid() {
1299 full_key_tracker.observe(iter.key());
1300 key_count += 1;
1301 iter.next().await.unwrap();
1302 }
1303 assert_eq!(total_key_count, key_count);
1304 }
1305 }
1306}