1use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::sync::atomic::AtomicU64;
18use std::sync::{Arc, atomic};
19use std::time::Instant;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use fail::fail_point;
23use risingwave_hummock_sdk::KeyComparator;
24use risingwave_hummock_sdk::compaction_group::StateTableId;
25use risingwave_hummock_sdk::key::FullKey;
26use risingwave_hummock_sdk::key_range::KeyRange;
27use risingwave_hummock_sdk::sstable_info::SstableInfo;
28
29use crate::hummock::block_stream::BlockDataStream;
30use crate::hummock::compactor::task_progress::TaskProgress;
31use crate::hummock::iterator::{Forward, HummockIterator, ValueMeta};
32use crate::hummock::sstable_store::SstableStoreRef;
33use crate::hummock::value::HummockValue;
34use crate::hummock::{BlockHolder, BlockIterator, BlockMeta, HummockResult};
35use crate::monitor::StoreLocalStatistic;
36
37const PROGRESS_KEY_INTERVAL: usize = 100;
38
39pub struct SstableStreamIterator {
43 sstable_store: SstableStoreRef,
44 block_metas: Vec<BlockMeta>,
46 block_stream: Option<BlockDataStream>,
48
49 block_iter: Option<BlockIterator>,
51
52 block_idx: usize,
54
55 stats_ptr: Arc<AtomicU64>,
57
58 sstable_info: SstableInfo,
60
61 sstable_table_ids: HashSet<StateTableId>,
63 task_progress: Arc<TaskProgress>,
64 io_retry_times: usize,
65 max_io_retry_times: usize,
66
67 key_range_left: FullKey<Vec<u8>>,
69 key_range_right: FullKey<Vec<u8>>,
70 key_range_right_exclusive: bool,
71}
72
73impl SstableStreamIterator {
74 pub fn new(
89 block_metas: Vec<BlockMeta>,
90 sstable_info: SstableInfo,
91 stats: &StoreLocalStatistic,
92 task_progress: Arc<TaskProgress>,
93 sstable_store: SstableStoreRef,
94 max_io_retry_times: usize,
95 ) -> Self {
96 let sstable_table_ids = HashSet::from_iter(sstable_info.table_ids.iter().cloned());
97
98 let block_metas = filter_block_metas(
100 &block_metas,
101 &sstable_table_ids,
102 sstable_info.key_range.clone(),
103 );
104
105 let key_range_left = FullKey::decode(&sstable_info.key_range.left).to_vec();
106 let key_range_right = FullKey::decode(&sstable_info.key_range.right).to_vec();
107 let key_range_right_exclusive = sstable_info.key_range.right_exclusive;
108
109 Self {
110 block_stream: None,
111 block_iter: None,
112 block_metas,
113 block_idx: 0,
114 stats_ptr: stats.remote_io_time.clone(),
115 sstable_table_ids,
116 sstable_info,
117 sstable_store,
118 task_progress,
119 io_retry_times: 0,
120 max_io_retry_times,
121 key_range_left,
122 key_range_right,
123 key_range_right_exclusive,
124 }
125 }
126
127 async fn create_stream(&mut self) -> HummockResult<()> {
128 let block_stream = self
129 .sstable_store
130 .get_stream_for_blocks(
131 self.sstable_info.object_id,
132 &self.block_metas[self.block_idx..],
133 )
134 .instrument_await("stream_iter_get_stream".verbose())
135 .await?;
136 self.block_stream = Some(block_stream);
137 Ok(())
138 }
139
140 async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
141 while let Some(block_iter) = self.block_iter.as_mut() {
142 if self
143 .sstable_table_ids
144 .contains(&block_iter.table_id().table_id)
145 {
146 return Ok(());
147 } else {
148 self.next_block().await?;
149 }
150 }
151 Ok(())
152 }
153
154 pub async fn seek(&mut self, seek_key: Option<FullKey<&[u8]>>) -> HummockResult<()> {
159 self.next_block().await?;
161
162 let seek_key = if let Some(seek_key) = seek_key {
167 if seek_key.cmp(&self.key_range_left.to_ref()).is_lt() {
168 Some(self.key_range_left.to_ref())
169 } else {
170 Some(seek_key)
171 }
172 } else {
173 Some(self.key_range_left.to_ref())
174 };
175
176 if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
177 block_iter.seek(seek_key);
178
179 if !block_iter.is_valid() {
180 self.next_block().await?;
182 }
183 }
184
185 self.prune_from_valid_block_iter().await?;
186 Ok(())
187 }
188
189 async fn next_block(&mut self) -> HummockResult<()> {
194 let now = Instant::now();
196 let _time_stat = scopeguard::guard(self.stats_ptr.clone(), |stats_ptr: Arc<AtomicU64>| {
197 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
198 stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);
199 });
200 if self.block_idx < self.block_metas.len() {
201 loop {
202 let ret = match &mut self.block_stream {
203 Some(block_stream) => block_stream.next_block().await,
204 None => {
205 self.create_stream().await?;
206 continue;
207 }
208 };
209 match ret {
210 Ok(Some(block)) => {
211 let mut block_iter =
212 BlockIterator::new(BlockHolder::from_owned_block(block));
213 block_iter.seek_to_first();
214 self.block_idx += 1;
215 self.block_iter = Some(block_iter);
216 return Ok(());
217 }
218 Ok(None) => break,
219 Err(e) => {
220 if !e.is_object_error() || !self.need_recreate_io_stream() {
221 return Err(e);
222 }
223 self.block_stream.take();
224 self.io_retry_times += 1;
225 fail_point!("create_stream_err");
226
227 tracing::warn!(
228 "retry create stream for sstable {} times, sstinfo={}",
229 self.io_retry_times,
230 self.sst_debug_info()
231 );
232 }
233 }
234 }
235 }
236 self.block_idx = self.block_metas.len();
237 self.block_iter = None;
238
239 Ok(())
240 }
241
242 pub async fn next(&mut self) -> HummockResult<()> {
249 if !self.is_valid() {
250 return Ok(());
251 }
252
253 let block_iter = self.block_iter.as_mut().expect("no block iter");
254 block_iter.next();
255 if !block_iter.is_valid() {
256 self.next_block().await?;
257 self.prune_from_valid_block_iter().await?;
258 }
259
260 if !self.is_valid() {
261 return Ok(());
262 }
263
264 let key = self
266 .block_iter
267 .as_ref()
268 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
269 .key();
270
271 if self.exceed_key_range_right(key) {
272 self.block_iter = None;
273 }
274
275 Ok(())
276 }
277
278 pub fn key(&self) -> FullKey<&[u8]> {
279 let key = self
280 .block_iter
281 .as_ref()
282 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
283 .key();
284
285 assert!(
286 !self.exceed_key_range_left(key),
287 "key {:?} key_range_left {:?}",
288 key,
289 self.key_range_left.to_ref()
290 );
291
292 assert!(
293 !self.exceed_key_range_right(key),
294 "key {:?} key_range_right {:?} key_range_right_exclusive {}",
295 key,
296 self.key_range_right.to_ref(),
297 self.key_range_right_exclusive
298 );
299
300 key
301 }
302
303 pub fn value(&self) -> HummockValue<&[u8]> {
304 let raw_value = self
305 .block_iter
306 .as_ref()
307 .unwrap_or_else(|| panic!("no block iter sstinfo={}", self.sst_debug_info()))
308 .value();
309 HummockValue::from_slice(raw_value)
310 .unwrap_or_else(|_| panic!("decode error sstinfo={}", self.sst_debug_info()))
311 }
312
313 pub fn is_valid(&self) -> bool {
314 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
316 }
317
318 fn sst_debug_info(&self) -> String {
319 format!(
320 "object_id={}, sst_id={}, meta_offset={}, table_ids={:?}",
321 self.sstable_info.object_id,
322 self.sstable_info.sst_id,
323 self.sstable_info.meta_offset,
324 self.sstable_info.table_ids
325 )
326 }
327
328 fn need_recreate_io_stream(&self) -> bool {
329 self.io_retry_times < self.max_io_retry_times
330 }
331
332 fn exceed_key_range_left(&self, key: FullKey<&[u8]>) -> bool {
333 key.cmp(&self.key_range_left.to_ref()).is_lt()
334 }
335
336 fn exceed_key_range_right(&self, key: FullKey<&[u8]>) -> bool {
337 if self.key_range_right_exclusive {
338 key.cmp(&self.key_range_right.to_ref()).is_ge()
339 } else {
340 key.cmp(&self.key_range_right.to_ref()).is_gt()
341 }
342 }
343}
344
345impl Drop for SstableStreamIterator {
346 fn drop(&mut self) {
347 self.task_progress.dec_num_pending_read_io()
348 }
349}
350
351pub struct ConcatSstableIterator {
354 key_range: KeyRange,
357
358 sstable_iter: Option<SstableStreamIterator>,
360
361 cur_idx: usize,
363
364 sstables: Vec<SstableInfo>,
366
367 existing_table_ids: HashSet<StateTableId>,
368
369 sstable_store: SstableStoreRef,
370
371 stats: StoreLocalStatistic,
372 task_progress: Arc<TaskProgress>,
373 max_io_retry_times: usize,
374}
375
376impl ConcatSstableIterator {
377 pub fn new(
381 existing_table_ids: Vec<StateTableId>,
382 sst_infos: Vec<SstableInfo>,
383 key_range: KeyRange,
384 sstable_store: SstableStoreRef,
385 task_progress: Arc<TaskProgress>,
386 max_io_retry_times: usize,
387 ) -> Self {
388 Self {
389 key_range,
390 sstable_iter: None,
391 cur_idx: 0,
392 sstables: sst_infos,
393 existing_table_ids: HashSet::from_iter(existing_table_ids),
394 sstable_store,
395 task_progress,
396 stats: StoreLocalStatistic::default(),
397 max_io_retry_times,
398 }
399 }
400
401 #[cfg(test)]
402 pub fn for_test(
403 existing_table_ids: Vec<StateTableId>,
404 sst_infos: Vec<SstableInfo>,
405 key_range: KeyRange,
406 sstable_store: SstableStoreRef,
407 ) -> Self {
408 Self::new(
409 existing_table_ids,
410 sst_infos,
411 key_range,
412 sstable_store,
413 Arc::new(TaskProgress::default()),
414 0,
415 )
416 }
417
418 async fn seek_idx(
420 &mut self,
421 idx: usize,
422 seek_key: Option<FullKey<&[u8]>>,
423 ) -> HummockResult<()> {
424 self.sstable_iter.take();
425 let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
426 {
427 (Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
428 Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
429 Ordering::Greater => Some(seek_key),
430 },
431 (Some(seek_key), true) => Some(seek_key),
432 (None, true) => None,
433 (None, false) => Some(FullKey::decode(&self.key_range.left)),
434 };
435
436 self.cur_idx = idx;
437 while self.cur_idx < self.sstables.len() {
438 let table_info = &self.sstables[self.cur_idx];
439 let mut found = table_info
440 .table_ids
441 .iter()
442 .any(|table_id| self.existing_table_ids.contains(table_id));
443 if !found {
444 self.cur_idx += 1;
445 seek_key = None;
446 continue;
447 }
448 let sstable = self
449 .sstable_store
450 .sstable(table_info, &mut self.stats)
451 .instrument_await("stream_iter_sstable".verbose())
452 .await?;
453
454 let filter_key_range = match seek_key {
455 Some(seek_key) => {
456 KeyRange::new(seek_key.encode().into(), self.key_range.right.clone())
457 }
458 None => self.key_range.clone(),
459 };
460
461 let block_metas = filter_block_metas(
462 &sstable.meta.block_metas,
463 &self.existing_table_ids,
464 filter_key_range,
465 );
466
467 if block_metas.is_empty() {
468 found = false;
469 } else {
470 self.task_progress.inc_num_pending_read_io();
471 let mut sstable_iter = SstableStreamIterator::new(
472 block_metas,
473 table_info.clone(),
474 &self.stats,
475 self.task_progress.clone(),
476 self.sstable_store.clone(),
477 self.max_io_retry_times,
478 );
479 sstable_iter.seek(seek_key).await?;
480
481 if sstable_iter.is_valid() {
482 self.sstable_iter = Some(sstable_iter);
483 } else {
484 found = false;
485 }
486 }
487 if found {
488 return Ok(());
489 } else {
490 self.cur_idx += 1;
491 seek_key = None;
492 }
493 }
494 Ok(())
495 }
496}
497
498impl HummockIterator for ConcatSstableIterator {
499 type Direction = Forward;
500
501 async fn next(&mut self) -> HummockResult<()> {
502 let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
503
504 sstable_iter.next().await?;
506 if sstable_iter.is_valid() {
507 Ok(())
508 } else {
509 self.seek_idx(self.cur_idx + 1, None).await?;
511 Ok(())
512 }
513 }
514
515 fn key(&self) -> FullKey<&[u8]> {
516 self.sstable_iter.as_ref().expect("no table iter").key()
517 }
518
519 fn value(&self) -> HummockValue<&[u8]> {
520 self.sstable_iter.as_ref().expect("no table iter").value()
521 }
522
523 fn is_valid(&self) -> bool {
524 self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
525 }
526
527 async fn rewind(&mut self) -> HummockResult<()> {
528 self.seek_idx(0, None).await
529 }
530
531 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
533 let seek_key = if self.key_range.left.is_empty() {
534 key
535 } else {
536 match key.cmp(&FullKey::decode(&self.key_range.left)) {
537 Ordering::Less | Ordering::Equal => FullKey::decode(&self.key_range.left),
538 Ordering::Greater => key,
539 }
540 };
541 let table_idx = self.sstables.partition_point(|table| {
542 let max_sst_key = &table.key_range.right;
549 FullKey::decode(max_sst_key).cmp(&seek_key) == Ordering::Less
550 });
551
552 self.seek_idx(table_idx, Some(key)).await
553 }
554
555 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
556 stats.add(&self.stats)
557 }
558
559 fn value_meta(&self) -> ValueMeta {
560 let iter = self.sstable_iter.as_ref().expect("no table iter");
561 assert!(iter.block_idx >= 1);
564 ValueMeta {
565 object_id: Some(iter.sstable_info.object_id),
566 block_id: Some(iter.block_idx as u64 - 1),
567 }
568 }
569}
570
571pub struct MonitoredCompactorIterator<I> {
572 inner: I,
573 task_progress: Arc<TaskProgress>,
574
575 processed_key_num: usize,
576}
577
578impl<I: HummockIterator<Direction = Forward>> MonitoredCompactorIterator<I> {
579 pub fn new(inner: I, task_progress: Arc<TaskProgress>) -> Self {
580 Self {
581 inner,
582 task_progress,
583 processed_key_num: 0,
584 }
585 }
586}
587
588impl<I: HummockIterator<Direction = Forward>> HummockIterator for MonitoredCompactorIterator<I> {
589 type Direction = Forward;
590
591 async fn next(&mut self) -> HummockResult<()> {
592 self.inner.next().await?;
593 self.processed_key_num += 1;
594
595 if self.processed_key_num % PROGRESS_KEY_INTERVAL == 0 {
596 self.task_progress
597 .inc_progress_key(PROGRESS_KEY_INTERVAL as _);
598 }
599
600 Ok(())
601 }
602
603 fn key(&self) -> FullKey<&[u8]> {
604 self.inner.key()
605 }
606
607 fn value(&self) -> HummockValue<&[u8]> {
608 self.inner.value()
609 }
610
611 fn is_valid(&self) -> bool {
612 self.inner.is_valid()
613 }
614
615 async fn rewind(&mut self) -> HummockResult<()> {
616 self.processed_key_num = 0;
617 self.inner.rewind().await?;
618 Ok(())
619 }
620
621 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
622 self.processed_key_num = 0;
623 self.inner.seek(key).await?;
624 Ok(())
625 }
626
627 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
628 self.inner.collect_local_statistic(stats)
629 }
630
631 fn value_meta(&self) -> ValueMeta {
632 self.inner.value_meta()
633 }
634}
635
636pub(crate) fn filter_block_metas(
637 block_metas: &Vec<BlockMeta>,
638 existing_table_ids: &HashSet<u32>,
639 key_range: KeyRange,
640) -> Vec<BlockMeta> {
641 if block_metas.is_empty() {
642 return vec![];
643 }
644
645 let mut start_index = if key_range.left.is_empty() {
646 0
647 } else {
648 block_metas
650 .partition_point(|block| {
651 KeyComparator::compare_encoded_full_key(&key_range.left, &block.smallest_key)
652 != Ordering::Less
653 })
654 .saturating_sub(1)
655 };
656
657 let mut end_index = if key_range.right.is_empty() {
658 block_metas.len()
659 } else {
660 let ret = block_metas.partition_point(|block| {
661 KeyComparator::compare_encoded_full_key(&block.smallest_key, &key_range.right)
662 != Ordering::Greater
663 });
664
665 if ret == 0 {
666 return vec![];
668 }
669
670 ret
671 }
672 .saturating_sub(1);
673
674 while start_index <= end_index {
676 let start_block_table_id = block_metas[start_index].table_id().table_id();
677 if existing_table_ids.contains(&start_block_table_id) {
678 break;
679 }
680
681 let old_start_index = start_index;
683 let block_metas_to_search = &block_metas[start_index..=end_index];
684
685 start_index += block_metas_to_search
686 .partition_point(|block_meta| block_meta.table_id().table_id() == start_block_table_id);
687
688 if old_start_index == start_index {
689 break;
691 }
692 }
693
694 while start_index <= end_index {
695 let end_block_table_id = block_metas[end_index].table_id().table_id();
696 if existing_table_ids.contains(&end_block_table_id) {
697 break;
698 }
699
700 let old_end_index = end_index;
701 let block_metas_to_search = &block_metas[start_index..=end_index];
702
703 end_index = start_index
704 + block_metas_to_search
705 .partition_point(|block_meta| block_meta.table_id().table_id() < end_block_table_id)
706 .saturating_sub(1);
707
708 if end_index == old_end_index {
709 break;
711 }
712 }
713
714 if start_index > end_index {
715 return vec![];
716 }
717
718 block_metas[start_index..=end_index].to_vec()
719}
720
721#[cfg(test)]
722mod tests {
723 use std::cmp::Ordering;
724 use std::collections::HashSet;
725
726 use risingwave_common::catalog::TableId;
727 use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, next_full_key, prev_full_key};
728 use risingwave_hummock_sdk::key_range::KeyRange;
729 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
730
731 use crate::hummock::BlockMeta;
732 use crate::hummock::compactor::ConcatSstableIterator;
733 use crate::hummock::iterator::test_utils::mock_sstable_store;
734 use crate::hummock::iterator::{HummockIterator, MergeIterator};
735 use crate::hummock::test_utils::{
736 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_test_sstable_info, test_key_of,
737 test_value_of,
738 };
739 use crate::hummock::value::HummockValue;
740
741 #[tokio::test]
742 async fn test_concat_iterator() {
743 let sstable_store = mock_sstable_store().await;
744 let mut table_infos = vec![];
745 for object_id in 0..3 {
746 let start_index = object_id * TEST_KEYS_COUNT;
747 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
748 let table_info = gen_test_sstable_info(
749 default_builder_opt_for_test(),
750 object_id as u64,
751 (start_index..end_index)
752 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
753 sstable_store.clone(),
754 )
755 .await;
756 table_infos.push(table_info);
757 }
758 let start_index = 5000;
759 let end_index = 25000;
760
761 let kr = KeyRange::new(
762 test_key_of(start_index).encode().into(),
763 test_key_of(end_index).encode().into(),
764 );
765 let mut iter = ConcatSstableIterator::for_test(
766 vec![0],
767 table_infos.clone(),
768 kr.clone(),
769 sstable_store.clone(),
770 );
771 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
772
773 for idx in start_index..end_index {
774 let key = iter.key();
775 let val = iter.value();
776 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
777 assert_eq!(
778 val.into_user_value().unwrap(),
779 test_value_of(idx).as_slice()
780 );
781 iter.next().await.unwrap();
782 }
783
784 let kr = KeyRange::new(
786 test_key_of(30000).encode().into(),
787 test_key_of(40000).encode().into(),
788 );
789 let mut iter = ConcatSstableIterator::for_test(
790 vec![0],
791 table_infos.clone(),
792 kr.clone(),
793 sstable_store.clone(),
794 );
795 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
796 assert!(!iter.is_valid());
797 let kr = KeyRange::new(
798 test_key_of(start_index).encode().into(),
799 test_key_of(40000).encode().into(),
800 );
801 let mut iter = ConcatSstableIterator::for_test(
802 vec![0],
803 table_infos.clone(),
804 kr.clone(),
805 sstable_store.clone(),
806 );
807 iter.seek(FullKey::decode(&kr.left)).await.unwrap();
808 for idx in start_index..30000 {
809 let key = iter.key();
810 let val = iter.value();
811 assert_eq!(key, test_key_of(idx).to_ref(), "failed at {}", idx);
812 assert_eq!(
813 val.into_user_value().unwrap(),
814 test_value_of(idx).as_slice()
815 );
816 iter.next().await.unwrap();
817 }
818 assert!(!iter.is_valid());
819
820 let kr = KeyRange::new(
822 test_key_of(0).encode().into(),
823 test_key_of(40000).encode().into(),
824 );
825 let mut iter = ConcatSstableIterator::for_test(
826 vec![0],
827 table_infos.clone(),
828 kr.clone(),
829 sstable_store.clone(),
830 );
831 iter.seek(test_key_of(10000).to_ref()).await.unwrap();
832 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10000).to_ref());
833 iter.seek(test_key_of(10001).to_ref()).await.unwrap();
834 assert!(iter.is_valid() && iter.cur_idx == 1 && iter.key() == test_key_of(10001).to_ref());
835 iter.seek(test_key_of(9999).to_ref()).await.unwrap();
836 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(9999).to_ref());
837 iter.seek(test_key_of(1).to_ref()).await.unwrap();
838 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == test_key_of(1).to_ref());
839 iter.seek(test_key_of(29999).to_ref()).await.unwrap();
840 assert!(iter.is_valid() && iter.cur_idx == 2 && iter.key() == test_key_of(29999).to_ref());
841 iter.seek(test_key_of(30000).to_ref()).await.unwrap();
842 assert!(!iter.is_valid());
843
844 let kr = KeyRange::new(
846 test_key_of(6000).encode().into(),
847 test_key_of(16000).encode().into(),
848 );
849 let mut iter = ConcatSstableIterator::for_test(
850 vec![0],
851 table_infos.clone(),
852 kr.clone(),
853 sstable_store.clone(),
854 );
855 iter.seek(test_key_of(17000).to_ref()).await.unwrap();
856 assert!(!iter.is_valid());
857 iter.seek(test_key_of(1).to_ref()).await.unwrap();
858 assert!(iter.is_valid() && iter.cur_idx == 0 && iter.key() == FullKey::decode(&kr.left));
859 }
860
861 #[tokio::test]
862 async fn test_concat_iterator_seek_idx() {
863 let sstable_store = mock_sstable_store().await;
864 let mut table_infos = vec![];
865 for object_id in 0..3 {
866 let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
867 let end_index = (object_id + 1) * TEST_KEYS_COUNT;
868 let table_info = gen_test_sstable_info(
869 default_builder_opt_for_test(),
870 object_id as u64,
871 (start_index..end_index)
872 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
873 sstable_store.clone(),
874 )
875 .await;
876 table_infos.push(table_info);
877 }
878
879 let kr = KeyRange::new(
881 test_key_of(0).encode().into(),
882 test_key_of(40000).encode().into(),
883 );
884 let mut iter = ConcatSstableIterator::for_test(
885 vec![0],
886 table_infos.clone(),
887 kr.clone(),
888 sstable_store.clone(),
889 );
890 let sst = sstable_store
891 .sstable(&iter.sstables[0], &mut iter.stats)
892 .await
893 .unwrap();
894 let block_metas = &sst.meta.block_metas;
895 let block_1_smallest_key = block_metas[1].smallest_key.clone();
896 let block_2_smallest_key = block_metas[2].smallest_key.clone();
897 let seek_key = block_1_smallest_key.clone();
899 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
900 .await
901 .unwrap();
902 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
903 let seek_key = prev_full_key(block_1_smallest_key.as_slice());
906 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
907 .await
908 .unwrap();
909 assert!(iter.is_valid() && iter.key() == FullKey::decode(block_1_smallest_key.as_slice()));
910 iter.next().await.unwrap();
911 let block_1_second_key = iter.key().to_vec();
912 let seek_key = test_key_of(30001);
914 iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
915 .await
916 .unwrap();
917 assert!(!iter.is_valid());
918
919 let kr = KeyRange::new(
921 next_full_key(&block_1_smallest_key).into(),
922 prev_full_key(&block_2_smallest_key).into(),
923 );
924 let mut iter = ConcatSstableIterator::for_test(
925 vec![0],
926 table_infos.clone(),
927 kr.clone(),
928 sstable_store.clone(),
929 );
930 let seek_key = FullKey::decode(&block_2_smallest_key);
932 assert!(seek_key.cmp(&FullKey::decode(&kr.right)) == Ordering::Greater);
933 iter.seek_idx(0, Some(seek_key)).await.unwrap();
934 assert!(!iter.is_valid());
935 let seek_key = test_key_of(0).encode();
937 iter.seek_idx(0, Some(FullKey::decode(&seek_key)))
938 .await
939 .unwrap();
940 assert!(iter.is_valid());
941 assert_eq!(iter.key(), block_1_second_key.to_ref());
942
943 iter.seek_idx(0, None).await.unwrap();
945 assert!(iter.is_valid());
946 assert_eq!(iter.key(), block_1_second_key.to_ref());
947 }
948
949 #[tokio::test]
950 async fn test_filter_block_metas() {
951 use crate::hummock::compactor::iterator::filter_block_metas;
952
953 {
954 let block_metas = Vec::default();
955
956 let ret = filter_block_metas(&block_metas, &HashSet::default(), KeyRange::default());
957
958 assert!(ret.is_empty());
959 }
960
961 {
962 let block_metas = vec![
963 BlockMeta {
964 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
965 ..Default::default()
966 },
967 BlockMeta {
968 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
969 ..Default::default()
970 },
971 BlockMeta {
972 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
973 ..Default::default()
974 },
975 ];
976
977 let ret = filter_block_metas(
978 &block_metas,
979 &HashSet::from_iter(vec![1_u32, 2, 3].into_iter()),
980 KeyRange::default(),
981 );
982
983 assert_eq!(3, ret.len());
984 assert_eq!(
985 1,
986 FullKey::decode(&ret[0].smallest_key)
987 .user_key
988 .table_id
989 .table_id()
990 );
991 assert_eq!(
992 3,
993 FullKey::decode(&ret[2].smallest_key)
994 .user_key
995 .table_id
996 .table_id()
997 );
998 }
999
1000 {
1001 let block_metas = vec![
1002 BlockMeta {
1003 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1004 ..Default::default()
1005 },
1006 BlockMeta {
1007 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1008 ..Default::default()
1009 },
1010 BlockMeta {
1011 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1012 ..Default::default()
1013 },
1014 ];
1015
1016 let ret = filter_block_metas(
1017 &block_metas,
1018 &HashSet::from_iter(vec![2_u32, 3].into_iter()),
1019 KeyRange::default(),
1020 );
1021
1022 assert_eq!(2, ret.len());
1023 assert_eq!(
1024 2,
1025 FullKey::decode(&ret[0].smallest_key)
1026 .user_key
1027 .table_id
1028 .table_id()
1029 );
1030 assert_eq!(
1031 3,
1032 FullKey::decode(&ret[1].smallest_key)
1033 .user_key
1034 .table_id
1035 .table_id()
1036 );
1037 }
1038
1039 {
1040 let block_metas = vec![
1041 BlockMeta {
1042 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1043 ..Default::default()
1044 },
1045 BlockMeta {
1046 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1047 ..Default::default()
1048 },
1049 BlockMeta {
1050 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1051 ..Default::default()
1052 },
1053 ];
1054
1055 let ret = filter_block_metas(
1056 &block_metas,
1057 &HashSet::from_iter(vec![1_u32, 2_u32].into_iter()),
1058 KeyRange::default(),
1059 );
1060
1061 assert_eq!(2, ret.len());
1062 assert_eq!(
1063 1,
1064 FullKey::decode(&ret[0].smallest_key)
1065 .user_key
1066 .table_id
1067 .table_id()
1068 );
1069 assert_eq!(
1070 2,
1071 FullKey::decode(&ret[1].smallest_key)
1072 .user_key
1073 .table_id
1074 .table_id()
1075 );
1076 }
1077
1078 {
1079 let block_metas = vec![
1080 BlockMeta {
1081 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1082 ..Default::default()
1083 },
1084 BlockMeta {
1085 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1086 ..Default::default()
1087 },
1088 BlockMeta {
1089 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1090 ..Default::default()
1091 },
1092 ];
1093 let ret = filter_block_metas(
1094 &block_metas,
1095 &HashSet::from_iter(vec![2_u32].into_iter()),
1096 KeyRange::default(),
1097 );
1098
1099 assert_eq!(1, ret.len());
1100 assert_eq!(
1101 2,
1102 FullKey::decode(&ret[0].smallest_key)
1103 .user_key
1104 .table_id
1105 .table_id()
1106 );
1107 }
1108
1109 {
1110 let block_metas = vec![
1111 BlockMeta {
1112 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1113 ..Default::default()
1114 },
1115 BlockMeta {
1116 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1117 ..Default::default()
1118 },
1119 BlockMeta {
1120 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1121 ..Default::default()
1122 },
1123 BlockMeta {
1124 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1125 ..Default::default()
1126 },
1127 BlockMeta {
1128 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1129 ..Default::default()
1130 },
1131 ];
1132 let ret = filter_block_metas(
1133 &block_metas,
1134 &HashSet::from_iter(vec![2_u32].into_iter()),
1135 KeyRange::default(),
1136 );
1137
1138 assert_eq!(1, ret.len());
1139 assert_eq!(
1140 2,
1141 FullKey::decode(&ret[0].smallest_key)
1142 .user_key
1143 .table_id
1144 .table_id()
1145 );
1146 }
1147
1148 {
1149 let block_metas = vec![
1150 BlockMeta {
1151 smallest_key: FullKey::for_test(TableId::new(1), Vec::default(), 0).encode(),
1152 ..Default::default()
1153 },
1154 BlockMeta {
1155 smallest_key: FullKey::for_test(TableId::new(2), Vec::default(), 0).encode(),
1156 ..Default::default()
1157 },
1158 BlockMeta {
1159 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1160 ..Default::default()
1161 },
1162 BlockMeta {
1163 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1164 ..Default::default()
1165 },
1166 BlockMeta {
1167 smallest_key: FullKey::for_test(TableId::new(3), Vec::default(), 0).encode(),
1168 ..Default::default()
1169 },
1170 ];
1171
1172 let ret = filter_block_metas(
1173 &block_metas,
1174 &HashSet::from_iter(vec![2_u32].into_iter()),
1175 KeyRange::default(),
1176 );
1177
1178 assert_eq!(1, ret.len());
1179 assert_eq!(
1180 2,
1181 FullKey::decode(&ret[0].smallest_key)
1182 .user_key
1183 .table_id
1184 .table_id()
1185 );
1186 }
1187 }
1188
1189 #[tokio::test]
1190 async fn test_iterator_same_obj() {
1191 let sstable_store = mock_sstable_store().await;
1192
1193 let table_info = gen_test_sstable_info(
1194 default_builder_opt_for_test(),
1195 1_u64,
1196 (1..10000).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
1197 sstable_store.clone(),
1198 )
1199 .await;
1200
1201 let split_key = test_key_of(5000).encode();
1202 let sst_1: SstableInfo = SstableInfoInner {
1203 key_range: KeyRange {
1204 left: table_info.key_range.left.clone(),
1205 right: split_key.clone().into(),
1206 right_exclusive: true,
1207 },
1208 ..table_info.get_inner()
1209 }
1210 .into();
1211
1212 let total_key_count = sst_1.total_key_count;
1213 let sst_2: SstableInfo = SstableInfoInner {
1214 sst_id: sst_1.sst_id + 1,
1215 key_range: KeyRange {
1216 left: split_key.clone().into(),
1217 right: table_info.key_range.right.clone(),
1218 right_exclusive: table_info.key_range.right_exclusive,
1219 },
1220 ..table_info.get_inner()
1221 }
1222 .into();
1223
1224 {
1225 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1227
1228 let mut iter = ConcatSstableIterator::for_test(
1229 vec![0],
1230 vec![sst_1.clone(), sst_2.clone()],
1231 KeyRange::default(),
1232 sstable_store.clone(),
1233 );
1234
1235 iter.rewind().await.unwrap();
1236
1237 let mut key_count = 0;
1238 while iter.is_valid() {
1239 let is_new_user_key = full_key_tracker.observe(iter.key());
1240 assert!(is_new_user_key);
1241 key_count += 1;
1242 iter.next().await.unwrap();
1243 }
1244
1245 assert_eq!(total_key_count, key_count);
1246 }
1247
1248 {
1249 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
1250 let concat_1 = ConcatSstableIterator::for_test(
1251 vec![0],
1252 vec![sst_1.clone()],
1253 KeyRange::default(),
1254 sstable_store.clone(),
1255 );
1256
1257 let concat_2 = ConcatSstableIterator::for_test(
1258 vec![0],
1259 vec![sst_2.clone()],
1260 KeyRange::default(),
1261 sstable_store.clone(),
1262 );
1263
1264 let mut key_count = 0;
1265 let mut iter = MergeIterator::for_compactor(vec![concat_1, concat_2]);
1266 iter.rewind().await.unwrap();
1267 while iter.is_valid() {
1268 full_key_tracker.observe(iter.key());
1269 key_count += 1;
1270 iter.next().await.unwrap();
1271 }
1272 assert_eq!(total_key_count, key_count);
1273 }
1274 }
1275}