1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::sync::atomic::AtomicU64;
18use std::sync::{Arc, atomic};
19use std::time::Instant;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use fail::fail_point;
23use risingwave_common::catalog::TableId;
24use risingwave_hummock_sdk::KeyComparator;
25use risingwave_hummock_sdk::compaction_group::StateTableId;
26use risingwave_hummock_sdk::key::FullKey;
27use risingwave_hummock_sdk::key_range::KeyRange;
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29
30use crate::hummock::block_stream::BlockDataStream;
31use crate::hummock::compactor::task_progress::TaskProgress;
32use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
33use crate::hummock::sstable_store::SstableStoreRef;
34use crate::hummock::value::HummockValue;
35use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult};
36use crate::monitor::StoreLocalStatistic;
37
38const PROGRESS_KEY_INTERVAL: usize = 100;
39
40pub struct SstableStreamIterator {
44 sstable_store: SstableStoreRef,
45 block_metas: Vec<BlockMeta>,
47 block_stream: Option<BlockDataStream>,
49
50 block_iter: Option<BlockIterator>,
52
53 block_idx: usize,
55
56 stats_ptr: Arc<AtomicU64>,
58
59 sstable_info: SstableInfo,
61
62 sstable_table_ids: HashSet<StateTableId>,
64 task_progress: Arc<TaskProgress>,
65 io_retry_times: usize,
66 max_io_retry_times: usize,
67
68 key_range_left: FullKey<Vec<u8>>,
70 key_range_right: FullKey<Vec<u8>>,
71 key_range_right_exclusive: bool,
72}
73
74impl SstableStreamIterator {
75 pub fn new(
90 block_metas: Vec<BlockMeta>,
91 sstable_info: SstableInfo,
92 stats: &StoreLocalStatistic,
93 task_progress: Arc<TaskProgress>,
94 sstable_store: SstableStoreRef,
95 max_io_retry_times: usize,
96 ) -> Self {
97 let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());
98
99 let block_metas = filter_block_metas(
101 &block_metas,
102 &sstable_table_ids,
103 sstable_info.key_range.clone(),
104 );
105
106 let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
107 let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
108 let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
109
110 Self {
111 block_stream: None,
112 block_iter: None,
113 block_metas,
114 block_idx: 0,
115 stats_ptr: stats.remote_io_time.clone(),
116 sstable_table_ids,
117 sstable_info,
118 sstable_store,
119 task_progress,
120 io_retry_times: 0,
121 max_io_retry_times,
122 key_range_left,
123 key_range_right,
124 key_range_right_exclusive,
125 }
126 }
127
128 async fn create_stream(&mut self) -> HummockResult<()> {
129 let block_stream = self
130 .sstable_store
131 .get_stream_for_blocks(
132 self.sstable_info.object_id,
133 &self.block_metas[self.block_idx..],
134 )
135 .instrument_await("stream_iter_get_stream".verbose())
136 .await?;
137 self.block_stream = Some(block_stream);
138 Ok(())
139 }
140
141 async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
142 while let Some(block_iter) = self.block_iter.as_mut() {
143 if self.sstable_table_ids.contains(&block_iter.table_id()) {
144 return Ok(());
145 } else {
146 self.next_block().await?;
147 }
148 }
149 Ok(())
150 }
151
152 pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
157 self.next_block().await?;
159
160 let seek_key = if let Some(seek_key) = seek_key {
165 if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
166 Some(self.key_range_left.to_ref())
167 } else {
168 Some(seek_key)
169 }
170 } else {
171 Some(self.key_range_left.to_ref())
172 };
173
174 if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
175 block_iter.seek(seek_key);
176
177 if !block_iter.is_valid() {
178 self.next_block().await?;
180 }
181 }
182
183 self.prune_from_valid_block_iter().await?;
184 Ok(())
185 }
186
187 async fn next_block(&mut self) -> HummockResult<()> {
192 let now = Instant::now();
194 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
195 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
196 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
197 });
198 if self.block_idx < self.block_metas.len() {
199 loop {
200 let ret = match &mut self.block_stream {
201 Some(block_stream) => block_stream.next_block().await,
202 None => {
203 self.create_stream().await?;
204 continue;
205 }
206 };
207 match ret {
208 Ok(Some(block)) => {
209 let mut block_iter =
210 BlockIterator::new(BlockHolder::from_owned_block(block));
211 block_iter.seek_to_first();
212 self.block_idx += 1;
213 self.block_iter = Some(block_iter);
214 return Ok(());
215 }
216 Ok(None) => break,
217 Err(e) => {
218 if !e.is_object_error() || !self.need_recreate_io_stream() {
219 return Err(e);
220 }
221 self.block_stream.take();
222 self.io_retry_times += 1;
223 fail_point!("create_stream_err");
224
225 tracing::warn!(
226 "retry create stream for sstable {} times, sstinfo={}",
227 self.io_retry_times,
228 self.sst_debug_info()
229 );
230 }
231 }
232 }
233 }
234 self.block_idx = self.block_metas.len();
235 self.block_iter = None;
236
237 Ok(())
238 }
239
240 pub async fn next(&mut self) -> HummockResult<()> {
247 if !self.is_valid() {
248 return Ok(());
249 }
250
251 let block_iter = self.block_iter.as_mut().expect("no block iter");
252 block_iter.next();
253 if !block_iter.is_valid() {
254 self.next_block().await?;
255 self.prune_from_valid_block_iter().await?;
256 }
257
258 if !self.is_valid() {
259 return Ok(());
260 }
261
262 let key = self
264 .block_iter
265 .as_ref()
266 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
267 .key();
268
269 if self.exceed_key_range_right(key) {
270 self.block_iter = None;
271 }
272
273 Ok(())
274 }
275
276 pub fn key(&self) -> FullKey<&[u8]> {
277 let key = self
278 .block_iter
279 .as_ref()
280 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
281 .key();
282
283 assert!(
284 !self.exceed_key_range_left(key),
285 "key {:?} key_range_left {:?}",
286 key,
287 self.key_range_left.to_ref()
288 );
289
290 assert!(
291 !self.exceed_key_range_right(key),
292 "key {:?} key_range_right {:?} key_range_right_exclusive {}",
293 key,
294 self.key_range_right.to_ref(),
295 self.key_range_right_exclusive
296 );
297
298 key
299 }
300
301 pub fn value(&self) -> HummockValue<&[u8]> {
302 let raw_value = self
303 .block_iter
304 .as_ref()
305 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
306 .value();
307 HummockValue::from_slice(raw_value)
308 .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
309 }
310
311 pub fn is_valid(&self) -> bool {
312 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
314 }
315
316 fn sst_debug_info(&self) -> String {
317 format!(
318 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
319 self.sstable_info.object_id,
320 self.sstable_info.sst_id,
321 self.sstable_info.meta_offset,
322 self.sstable_info.table_ids
323 )
324 }
325
326 fn need_recreate_io_stream(&self) -> bool {
327 self.io_retry_times < self.max_io_retry_times
328 }
329
330 fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
331 key.cmp(&self.key_range_left.to_ref()).is_lt()
332 }
333
334 fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
335 if self.key_range_right_exclusive {
336 key.cmp(&self.key_range_right.to_ref()).is_ge()
337 } else {
338 key.cmp(&self.key_range_right.to_ref()).is_gt()
339 }
340 }
341}
342
343impl Drop for SstableStreamIterator {
344 fn drop(&mut self) {
345 self.task_progress.dec_num_pending_read_io()
346 }
347}
348
349pub struct ConcatSstableIterator {
352 key_range: KeyRange,
355
356 sstable_iter: Option<SstableStreamIterator>,
358
359 cur_idx: usize,
361
362 sstables: Vec<SstableInfo>,
364
365 existing_table_ids: HashSet<StateTableId>,
366
367 sstable_store: SstableStoreRef,
368
369 stats: StoreLocalStatistic,
370 task_progress: Arc<TaskProgress>,
371 max_io_retry_times: usize,
372}
373
374impl ConcatSstableIterator {
375 pub fn new(
379 existing_table_ids: Vec<StateTableId>,
380 sst_infos: Vec<SstableInfo>,
381 key_range: KeyRange,
382 sstable_store: SstableStoreRef,
383 task_progress: Arc<TaskProgress>,
384 max_io_retry_times: usize,
385 ) -> Self {
386 Self {
387 key_range,
388 sstable_iter: None,
389 cur_idx: 0,
390 sstables: sst_infos,
391 existing_table_ids: HashSet::from_iter(existing_table_ids),
392 sstable_store,
393 task_progress,
394 stats: StoreLocalStatistic::default(),
395 max_io_retry_times,
396 }
397 }
398
399 #[cfg(test)]
400 pub fn for_test(
401 existing_table_ids: Vec<impl Into<StateTableId>>,
402 sst_infos: Vec<SstableInfo>,
403 key_range: KeyRange,
404 sstable_store: SstableStoreRef,
405 ) -> Self {
406 Self::new(
407 existing_table_ids.into_iter().map(Into::into).collect(),
408 sst_infos,
409 key_range,
410 sstable_store,
411 Arc::new(TaskProgress::default()),
412 0,
413 )
414 }
415
416 async fn seek_idx(
418 &mut self,
419 idx: usize,
420 seek_key: Option<FullKey<&[u8]>>,
421 ) -> HummockResult<()> {
422 self.sstable_iter.take();
423 let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
424 {
425 (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
426 Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
427 Ordering::Greater => Some(seek_key),
428 },
429 (Some(seek_key), true) => Some(seek_key),
430 (None, true) => None,
431 (None, false) => Some(FullKey::decode(&self.key_range.left)),
432 };
433
434 self.cur_idx = idx;
435 while self.cur_idx < self.sstables.len() {
436 let table_info = &self.sstables[self.cur_idx];
437 let mut found = table_info
438 .table_ids
439 .iter()
440 .any(|table_id| self.existing_table_ids.contains(table_id));
441 if !found {
442 self.cur_idx += 1;
443 seek_key = None;
444 continue;
445 }
446 let sstable = self
447 .sstable_store
448 .sstable(table_info, &mut self.stats)
449 .instrument_await("stream_iter_sstable".verbose())
450 .await?;
451
452 let filter_key_range = match seek_key {
453 Some(seek_key) => {
454 KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
455 }
456 None => self.key_range.clone(),
457 };
458
459 let block_metas = filter_block_metas(
460 &sstable.meta.block_metas,
461 &self.existing_table_ids,
462 filter_key_range,
463 );
464
465 if block_metas.is_empty() {
466 found = false;
467 } else {
468 self.task_progress.inc_num_pending_read_io();
469 let mut sstable_iter = SstableStreamIterator::new(
470 block_metas,
471 table_info.clone(),
472 &self.stats,
473 self.task_progress.clone(),
474 self.sstable_store.clone(),
475 self.max_io_retry_times,
476 );
477 sstable_iter.seek(seek_key).await?;
478
479 if sstable_iter.is_valid() {
480 self.sstable_iter = Some(sstable_iter);
481 } else {
482 found = false;
483 }
484 }
485 if found {
486 return Ok(());
487 } else {
488 self.cur_idx += 1;
489 seek_key = None;
490 }
491 }
492 Ok(())
493 }
494}
495
496impl HummockIterator for ConcatSstableIterator {
497 type Direction = Forward;
498
499 async fn next(&mut self) -> HummockResult<()> {
500 let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
501
502 sstable_iter.next().await?;
504 if sstable_iter.is_valid() {
505 Ok(())
506 } else {
507 self.seek_idx(self.cur_idx + 1, None).await?;
509 Ok(())
510 }
511 }
512
513 fn key(&self) -> FullKey<&[u8]> {
514 self.sstable_iter.as_ref().expect("no table iter").key()
515 }
516
517 fn value(&self) -> HummockValue<&[u8]> {
518 self.sstable_iter.as_ref().expect("no table iter").value()
519 }
520
521 fn is_valid(&self) -> bool {
522 self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
523 }
524
525 async fn rewind(&mut self) -> HummockResult<()> {
526 self.seek_idx(0, None).await
527 }
528
529 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
531 let seek_key = if self.key_range.left.is_empty() {
532 key
533 } else {
534 match key.cmp(&FullKey::decode(&self.key_range.left)) {
535 Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
536 Ordering::Greater => key,
537 }
538 };
539 let table_idx = self.sstables.partition_point(|table| {
540 let max_sst_key = &table.key_range.right;
547 FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
548 });
549
550 self.seek_idx(table_idx, Some(key)).await
551 }
552
553 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
554 stats.add(&self.stats)
555 }
556
557 fn value_meta(&self) -> ValueMeta {
558 let iter = self.sstable_iter.as_ref().expect("no table iter");
559 assert!(iter.block_idx >= 1);
562 ValueMeta {
563 object_id: Some(iter.sstable_info.object_id),
564 block_id: Some(iter.block_idx as u64 - 1),
565 }
566 }
567}
568
569pub struct MonitoredCompactorIterator<I> {
570 inner: I,
571 task_progress: Arc<TaskProgress>,
572
573 processed_key_num: usize,
574}
575
576impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
577 pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
578 Self {
579 inner,
580 task_progress,
581 processed_key_num: 0,
582 }
583 }
584}
585
586impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
587 type Direction = Forward;
588
589 async fn next(&mut self) -> HummockResult<()> {
590 self.inner.next().await?;
591 self.processed_key_num += 1;
592
593 if self.processed_key_num.is_multiple_of(PROGRESS_KEY_INTERVAL) {
594 self.task_progress
595 .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
596 }
597
598 Ok(())
599 }
600
601 fn key(&self) -> FullKey<&[u8]> {
602 self.inner.key()
603 }
604
605 fn value(&self) -> HummockValue<&[u8]> {
606 self.inner.value()
607 }
608
609 fn is_valid(&self) -> bool {
610 self.inner.is_valid()
611 }
612
613 async fn rewind(&mut self) -> HummockResult<()> {
614 self.processed_key_num = 0;
615 self.inner.rewind().await?;
616 Ok(())
617 }
618
619 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
620 self.processed_key_num = 0;
621 self.inner.seek(key).await?;
622 Ok(())
623 }
624
625 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
626 self.inner.collect_local_statistic(stats)
627 }
628
629 fn value_meta(&self) -> ValueMeta {
630 self.inner.value_meta()
631 }
632}
633
634pub(crate) fn filter_block_metas(
635 block_metas: &Vec<BlockMeta>,
636 existing_table_ids: &HashSet<TableId>,
637 key_range: KeyRange,
638) -> Vec<BlockMeta> {
639 if block_metas.is_empty() {
640 return vec![];
641 }
642
643 let mut start_index = if key_range.left.is_empty() {
644 0
645 } else {
646 block_metas
648 .partition_point(|block| {
649 KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
650 != Ordering::Less
651 })
652 .saturating_sub(1)
653 };
654
655 let mut end_index = if key_range.right.is_empty() {
656 block_metas.len()
657 } else {
658 let ret = block_metas.partition_point(|block| {
659 KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
660 != Ordering::Greater
661 });
662
663 if ret == 0 {
664 return vec![];
666 }
667
668 ret
669 }
670 .saturating_sub(1);
671
672 while start_index <= end_index {
674 let start_block_table_id = block_metas[start_index].table_id();
675 if existing_table_ids.contains(&start_block_table_id) {
676 break;
677 }
678
679 let old_start_index = start_index;
681 let block_metas_to_search = &block_metas[start_index..=end_index];
682
683 start_index += block_metas_to_search
684 .partition_point(|block_meta| block_meta.table_id() == start_block_table_id);
685
686 if old_start_index == start_index {
687 break;
689 }
690 }
691
692 while start_index <= end_index {
693 let end_block_table_id = block_metas[end_index].table_id();
694 if existing_table_ids.contains(&end_block_table_id) {
695 break;
696 }
697
698 let old_end_index = end_index;
699 let block_metas_to_search = &block_metas[start_index..=end_index];
700
701 end_index = start_index
702 + block_metas_to_search
703 .partition_point(|block_meta| block_meta.table_id() < end_block_table_id)
704 .saturating_sub(1);
705
706 if end_index == old_end_index {
707 break;
709 }
710 }
711
712 if start_index > end_index {
713 return vec![];
714 }
715
716 block_metas[start_index..=end_index].to_vec()
717}
718
719#[cfg(test)]
720mod tests {
721 use std::cmp::Ordering;
722 use std::collections::HashSet;
723
724 use risingwave_common::catalog::TableId;
725 use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
726 use risingwave_hummock_sdk::key_range::KeyRange;
727 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
728
729 use crate::hummock::BlockMeta;
730 use crate::hummock::compactor::ConcatSstableIterator;
731 use crate::hummock::iterator::test_utils::mock_sstable_store;
732 use crate::hummock::iterator::{HummockIterator, MergeIterator};
733 use crate::hummock::test_utils::{
734 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info, test_key_of,
735 test_value_of,
736 };
737 use crate::hummock::value::HummockValue;
738
739 #[tokio::test]
740 async fn test_concat_iterator() {
741 let sstable_store = mock_sstable_store().await;
742 let mut table_infos = vec![];
743 for object_id in 0..3 {
744 let start_index = object_id * TEST_KEYS_COUNT;
745 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
746 let table_info = gen_test_sstable_info(
747 default_builder_opt_for_test(),
748 object_id as u64,
749 (start_index..end_index)
750 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
751 sstable_store.clone(),
752 )
753 .await;
754 table_infos.push(table_info);
755 }
756 let start_index = 5000;
757 let end_index = 25000;
758
759 let kr = KeyRange::new(
760 test_key_of(start_index).encode().into(),
761 test_key_of(end_index).encode().into(),
762 );
763 let mut iter = ConcatSstableIterator::for_test(
764 vec![0],
765 table_infos.clone(),
766 kr.clone(),
767 sstable_store.clone(),
768 );
769 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
770
771 for idx in start_index..end_index {
772 let key = iter.key();
773 let val = iter.value();
774 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
775 assert_eq!(
776 val.into_user_value().unwrap(),
777 test_value_of(idx).as_slice()
778 );
779 iter.next().await.unwrap();
780 }
781
782 let kr = KeyRange::new(
784 test_key_of(30000).encode().into(),
785 test_key_of(40000).encode().into(),
786 );
787 let mut iter = ConcatSstableIterator::for_test(
788 vec![0],
789 table_infos.clone(),
790 kr.clone(),
791 sstable_store.clone(),
792 );
793 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
794 assert!(!iter.is_valid());
795 let kr = KeyRange::new(
796 test_key_of(start_index).encode().into(),
797 test_key_of(40000).encode().into(),
798 );
799 let mut iter = ConcatSstableIterator::for_test(
800 vec![0],
801 table_infos.clone(),
802 kr.clone(),
803 sstable_store.clone(),
804 );
805 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
806 for idx in start_index..30000 {
807 let key = iter.key();
808 let val = iter.value();
809 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
810 assert_eq!(
811 val.into_user_value().unwrap(),
812 test_value_of(idx).as_slice()
813 );
814 iter.next().await.unwrap();
815 }
816 assert!(!iter.is_valid());
817
818 let kr = KeyRange::new(
820 test_key_of(0).encode().into(),
821 test_key_of(40000).encode().into(),
822 );
823 let mut iter = ConcatSstableIterator::for_test(
824 vec![0],
825 table_infos.clone(),
826 kr.clone(),
827 sstable_store.clone(),
828 );
829 iter.seek(test_key_of(10000).to_ref()).await.unwrap();
830 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
831 iter.seek(test_key_of(10001).to_ref()).await.unwrap();
832 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
833 iter.seek(test_key_of(9999).to_ref()).await.unwrap();
834 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
835 iter.seek(test_key_of(1).to_ref()).await.unwrap();
836 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
837 iter.seek(test_key_of(29999).to_ref()).await.unwrap();
838 assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
839 iter.seek(test_key_of(30000).to_ref()).await.unwrap();
840 assert!(!iter.is_valid());
841
842 let kr = KeyRange::new(
844 test_key_of(6000).encode().into(),
845 test_key_of(16000).encode().into(),
846 );
847 let mut iter = ConcatSstableIterator::for_test(
848 vec![0],
849 table_infos.clone(),
850 kr.clone(),
851 sstable_store.clone(),
852 );
853 iter.seek(test_key_of(17000).to_ref()).await.unwrap();
854 assert!(!iter.is_valid());
855 iter.seek(test_key_of(1).to_ref()).await.unwrap();
856 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
857 }
858
859 #[tokio::test]
860 async fn test_concat_iterator_seek_idx() {
861 let sstable_store = mock_sstable_store().await;
862 let mut table_infos = vec![];
863 for object_id in 0..3 {
864 let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
865 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
866 let table_info = gen_test_sstable_info(
867 default_builder_opt_for_test(),
868 object_id as u64,
869 (start_index..end_index)
870 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
871 sstable_store.clone(),
872 )
873 .await;
874 table_infos.push(table_info);
875 }
876
877 let kr = KeyRange::new(
879 test_key_of(0).encode().into(),
880 test_key_of(40000).encode().into(),
881 );
882 let mut iter = ConcatSstableIterator::for_test(
883 vec![0],
884 table_infos.clone(),
885 kr.clone(),
886 sstable_store.clone(),
887 );
888 let sst = sstable_store
889 .sstable(&iter.sstables[0], &mut iter.stats)
890 .await
891 .unwrap();
892 let block_metas = &sst.meta.block_metas;
893 let block_1_smallest_key = block_metas[1].smallest_key.clone();
894 let block_2_smallest_key = block_metas[2].smallest_key.clone();
895 let seek_key = block_1_smallest_key.clone();
897 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
898 .await
899 .unwrap();
900 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
901 let seek_key = prev_full_key(block_1_smallest_key.as_slice());
904 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
905 .await
906 .unwrap();
907 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
908 iter.next().await.unwrap();
909 let block_1_second_key = iter.key().to_vec();
910 let seek_key = test_key_of(30001);
912 iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
913 .await
914 .unwrap();
915 assert!(!iter.is_valid());
916
917 let kr = KeyRange::new(
919 next_full_key(&block_1_smallest_key).into(),
920 prev_full_key(&block_2_smallest_key).into(),
921 );
922 let mut iter = ConcatSstableIterator::for_test(
923 vec![0],
924 table_infos.clone(),
925 kr.clone(),
926 sstable_store.clone(),
927 );
928 let seek_key = FullKey::decode(&block_2_smallest_key);
930 assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
931 iter.seek_idx(0, Some(seek_key)).await.unwrap();
932 assert!(!iter.is_valid());
933 let seek_key = test_key_of(0).encode();
935 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
936 .await
937 .unwrap();
938 assert!(iter.is_valid());
939 assert_eq!(iter.key(), block_1_second_key.to_ref());
940
941 iter.seek_idx(0, None).await.unwrap();
943 assert!(iter.is_valid());
944 assert_eq!(iter.key(), block_1_second_key.to_ref());
945 }
946
947 #[tokio::test]
948 async fn test_filter_block_metas() {
949 use crate::hummock::compactor::iterator::filter_block_metas;
950
951 {
952 let block_metas = Vec::default();
953
954 let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
955
956 assert!(ret.is_empty());
957 }
958
959 {
960 let block_metas = vec![
961 BlockMeta {
962 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
963 ..Default::default()
964 },
965 BlockMeta {
966 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
967 ..Default::default()
968 },
969 BlockMeta {
970 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
971 ..Default::default()
972 },
973 ];
974
975 let ret = filter_block_metas(
976 &block_metas,
977 &HashSet::from_iter(vec![1_u32.into(), 2.into(), 3.into()].into_iter()),
978 KeyRange::default(),
979 );
980
981 assert_eq!(3, ret.len());
982 assert_eq!(
983 1,
984 FullKey::decode(&ret[0].smallest_key)
985 .user_key
986 .table_id
987 .as_raw_id()
988 );
989 assert_eq!(
990 3,
991 FullKey::decode(&ret[2].smallest_key)
992 .user_key
993 .table_id
994 .as_raw_id()
995 );
996 }
997
998 {
999 let block_metas = vec![
1000 BlockMeta {
1001 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1002 ..Default::default()
1003 },
1004 BlockMeta {
1005 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1006 ..Default::default()
1007 },
1008 BlockMeta {
1009 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1010 ..Default::default()
1011 },
1012 ];
1013
1014 let ret = filter_block_metas(
1015 &block_metas,
1016 &HashSet::from_iter(vec![2_u32.into(), 3.into()].into_iter()),
1017 KeyRange::default(),
1018 );
1019
1020 assert_eq!(2, ret.len());
1021 assert_eq!(
1022 2,
1023 FullKey::decode(&ret[0].smallest_key)
1024 .user_key
1025 .table_id
1026 .as_raw_id()
1027 );
1028 assert_eq!(
1029 3,
1030 FullKey::decode(&ret[1].smallest_key)
1031 .user_key
1032 .table_id
1033 .as_raw_id()
1034 );
1035 }
1036
1037 {
1038 let block_metas = vec![
1039 BlockMeta {
1040 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1041 ..Default::default()
1042 },
1043 BlockMeta {
1044 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1045 ..Default::default()
1046 },
1047 BlockMeta {
1048 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1049 ..Default::default()
1050 },
1051 ];
1052
1053 let ret = filter_block_metas(
1054 &block_metas,
1055 &HashSet::from_iter(vec![1_u32.into(), 2_u32.into()].into_iter()),
1056 KeyRange::default(),
1057 );
1058
1059 assert_eq!(2, ret.len());
1060 assert_eq!(
1061 1,
1062 FullKey::decode(&ret[0].smallest_key)
1063 .user_key
1064 .table_id
1065 .as_raw_id()
1066 );
1067 assert_eq!(
1068 2,
1069 FullKey::decode(&ret[1].smallest_key)
1070 .user_key
1071 .table_id
1072 .as_raw_id()
1073 );
1074 }
1075
1076 {
1077 let block_metas = vec![
1078 BlockMeta {
1079 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1080 ..Default::default()
1081 },
1082 BlockMeta {
1083 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1084 ..Default::default()
1085 },
1086 BlockMeta {
1087 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1088 ..Default::default()
1089 },
1090 ];
1091 let ret = filter_block_metas(
1092 &block_metas,
1093 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1094 KeyRange::default(),
1095 );
1096
1097 assert_eq!(1, ret.len());
1098 assert_eq!(
1099 2,
1100 FullKey::decode(&ret[0].smallest_key)
1101 .user_key
1102 .table_id
1103 .as_raw_id()
1104 );
1105 }
1106
1107 {
1108 let block_metas = vec![
1109 BlockMeta {
1110 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1111 ..Default::default()
1112 },
1113 BlockMeta {
1114 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1115 ..Default::default()
1116 },
1117 BlockMeta {
1118 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1119 ..Default::default()
1120 },
1121 BlockMeta {
1122 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1123 ..Default::default()
1124 },
1125 BlockMeta {
1126 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1127 ..Default::default()
1128 },
1129 ];
1130 let ret = filter_block_metas(
1131 &block_metas,
1132 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1133 KeyRange::default(),
1134 );
1135
1136 assert_eq!(1, ret.len());
1137 assert_eq!(
1138 2,
1139 FullKey::decode(&ret[0].smallest_key)
1140 .user_key
1141 .table_id
1142 .as_raw_id()
1143 );
1144 }
1145
1146 {
1147 let block_metas = vec![
1148 BlockMeta {
1149 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1150 ..Default::default()
1151 },
1152 BlockMeta {
1153 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1154 ..Default::default()
1155 },
1156 BlockMeta {
1157 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1158 ..Default::default()
1159 },
1160 BlockMeta {
1161 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1162 ..Default::default()
1163 },
1164 BlockMeta {
1165 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1166 ..Default::default()
1167 },
1168 ];
1169
1170 let ret = filter_block_metas(
1171 &block_metas,
1172 &HashSet::from_iter(vec![2_u32.into()].into_iter()),
1173 KeyRange::default(),
1174 );
1175
1176 assert_eq!(1, ret.len());
1177 assert_eq!(
1178 2,
1179 FullKey::decode(&ret[0].smallest_key)
1180 .user_key
1181 .table_id
1182 .as_raw_id()
1183 );
1184 }
1185 }
1186
1187 #[tokio::test]
1188 async fn test_iterator_same_obj() {
1189 let sstable_store = mock_sstable_store().await;
1190
1191 let table_info = gen_test_sstable_info(
1192 default_builder_opt_for_test(),
1193 1_u64,
1194 (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1195 sstable_store.clone(),
1196 )
1197 .await;
1198
1199 let split_key = test_key_of(5000).encode();
1200 let sst_1: SstableInfo = SstableInfoInner {
1201 key_range: KeyRange {
1202 left: table_info.key_range.left.clone(),
1203 right: split_key.clone().into(),
1204 right_exclusive: true,
1205 },
1206 ..table_info.get_inner()
1207 }
1208 .into();
1209
1210 let total_key_count = sst_1.total_key_count;
1211 let sst_2: SstableInfo = SstableInfoInner {
1212 sst_id: sst_1.sst_id + 1,
1213 key_range: KeyRange {
1214 left: split_key.clone().into(),
1215 right: table_info.key_range.right.clone(),
1216 right_exclusive: table_info.key_range.right_exclusive,
1217 },
1218 ..table_info.get_inner()
1219 }
1220 .into();
1221
1222 {
1223 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1225
1226 let mut iter = ConcatSstableIterator::for_test(
1227 vec![0],
1228 vec![sst_1.clone(), sst_2.clone()],
1229 KeyRange::default(),
1230 sstable_store.clone(),
1231 );
1232
1233 iter.rewind().await.unwrap();
1234
1235 let mut key_count = 0;
1236 while iter.is_valid() {
1237 let is_new_user_key = full_key_tracker.observe(iter.key());
1238 assert!(is_new_user_key);
1239 key_count += 1;
1240 iter.next().await.unwrap();
1241 }
1242
1243 assert_eq!(total_key_count, key_count);
1244 }
1245
1246 {
1247 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1248 let concat_1 = ConcatSstableIterator::for_test(
1249 vec![0],
1250 vec![sst_1.clone()],
1251 KeyRange::default(),
1252 sstable_store.clone(),
1253 );
1254
1255 let concat_2 = ConcatSstableIterator::for_test(
1256 vec![0],
1257 vec![sst_2.clone()],
1258 KeyRange::default(),
1259 sstable_store.clone(),
1260 );
1261
1262 let mut key_count = 0;
1263 let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1264 iter.rewind().await.unwrap();
1265 while iter.is_valid() {
1266 full_key_tracker.observe(iter.key());
1267 key_count += 1;
1268 iter.next().await.unwrap();
1269 }
1270 assert_eq!(total_key_count, key_count);
1271 }
1272 }
1273}