1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::{self};
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::array::VectorRef;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{TableId, TableOption};
29use risingwave_common::hash::VirtualNode;
30use risingwave_common::util::epoch::MAX_SPILL_TIMES;
31use risingwave_hummock_sdk::key::{
32 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
33};
34use risingwave_hummock_sdk::key_range::KeyRangeCommon;
35use risingwave_hummock_sdk::sstable_info::SstableInfo;
36use risingwave_hummock_sdk::table_watermark::{
37 TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
40use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
41use risingwave_pb::hummock::LevelType;
42use sync_point::sync_point;
43use tracing::warn;
44
45use crate::error::StorageResult;
46use crate::hummock::event_handler::LocalInstanceId;
47use crate::hummock::iterator::change_log::ChangeLogIterator;
48use crate::hummock::iterator::{
49 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
50};
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
53use crate::hummock::sstable_store::SstableStoreRef;
54use crate::hummock::table_change_log_manager::TableChangeLogManager;
55use crate::hummock::utils::{
56 MemoryTracker, filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts,
57 range_overlap, search_sst_idx,
58};
59use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
60use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
61use crate::hummock::{
62 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
63 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
64 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
65 hit_sstable_bloom_filter,
66};
67use crate::mem_table::{
68 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
69};
70use crate::monitor::{
71 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
72};
73use crate::store::{
74 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
75};
76use crate::vector::hnsw::nearest;
77use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
78
79pub type CommittedVersion = PinnedVersion;
80
81#[derive(Clone, Debug, PartialEq)]
87pub struct StagingSstableInfo {
88 sstable_infos: Vec<LocalSstableInfo>,
90 old_value_sstable_infos: Vec<LocalSstableInfo>,
91 epochs: Vec<HummockEpoch>,
94 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96 imm_size: usize,
97}
98
99impl StagingSstableInfo {
100 pub fn new(
101 sstable_infos: Vec<LocalSstableInfo>,
102 old_value_sstable_infos: Vec<LocalSstableInfo>,
103 epochs: Vec<HummockEpoch>,
104 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
105 imm_size: usize,
106 ) -> Self {
107 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
109 Self {
110 sstable_infos,
111 old_value_sstable_infos,
112 epochs,
113 imm_ids,
114 imm_size,
115 }
116 }
117
118 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
119 &self.sstable_infos
120 }
121
122 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
123 &self.old_value_sstable_infos
124 }
125
126 pub fn imm_size(&self) -> usize {
127 self.imm_size
128 }
129
130 pub fn epochs(&self) -> &Vec<HummockEpoch> {
131 &self.epochs
132 }
133
134 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
135 &self.imm_ids
136 }
137}
138
139pub enum VersionUpdate {
140 Sst(Arc<StagingSstableInfo>),
141 CommittedSnapshot(CommittedVersion),
142 NewTableWatermark {
143 direction: WatermarkDirection,
144 epoch: HummockEpoch,
145 vnode_watermarks: Vec<VnodeWatermark>,
146 watermark_type: WatermarkSerdeType,
147 },
148}
149
150pub struct StagingVersion {
151 pending_imm_size: usize,
152 pub pending_imms: Vec<(ImmutableMemtable, MemoryTracker)>,
159 pub uploading_imms: VecDeque<ImmutableMemtable>,
165
166 pub sst: VecDeque<Arc<StagingSstableInfo>>,
168}
169
170impl StagingVersion {
171 pub fn prune_overlap<'a>(
174 &'a self,
175 max_epoch_inclusive: HummockEpoch,
176 table_id: TableId,
177 table_key_range: &'a TableKeyRange,
178 ) -> (
179 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
180 impl Iterator<Item = &'a SstableInfo> + 'a,
181 ) {
182 let (left, right) = table_key_range;
183 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
184 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
185 let overlapped_imms = self
186 .pending_imms
187 .iter()
188 .map(|(imm, _)| imm)
189 .rev() .chain(self.uploading_imms.iter())
191 .filter(move |imm| {
192 imm.min_epoch() <= max_epoch_inclusive
194 && imm.table_id == table_id
195 && range_overlap(
196 &(left, right),
197 &imm.start_table_key(),
198 Bound::Included(&imm.end_table_key()),
199 )
200 });
201
202 let overlapped_ssts = self
204 .sst
205 .iter()
206 .filter(move |staging_sst| {
207 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
208 sst_max_epoch <= max_epoch_inclusive
209 })
210 .flat_map(move |staging_sst| {
211 staging_sst
214 .sstable_infos
215 .iter()
216 .map(|sstable| &sstable.sst_info)
217 .filter(move |sstable: &&SstableInfo| {
218 filter_single_sst(sstable, table_id, table_key_range)
219 })
220 });
221 (overlapped_imms, overlapped_ssts)
222 }
223
224 pub fn is_empty(&self) -> bool {
225 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
226 }
227}
228
229pub struct HummockReadVersion {
231 table_id: TableId,
232 instance_id: LocalInstanceId,
233
234 is_initialized: bool,
235
236 staging: StagingVersion,
238
239 committed: CommittedVersion,
241
242 is_replicated: bool,
247
248 table_watermarks: Option<TableWatermarksIndex>,
249
250 vnodes: Arc<Bitmap>,
253}
254
255impl HummockReadVersion {
256 pub fn new_with_replication_option(
257 table_id: TableId,
258 instance_id: LocalInstanceId,
259 committed_version: CommittedVersion,
260 is_replicated: bool,
261 vnodes: Arc<Bitmap>,
262 ) -> Self {
263 assert!(committed_version.is_valid());
267 Self {
268 table_id,
269 instance_id,
270 table_watermarks: {
271 match committed_version.table_watermarks.get(&table_id) {
272 Some(table_watermarks) => Some(TableWatermarksIndex::new_committed(
273 table_watermarks.clone(),
274 committed_version
275 .state_table_info
276 .info()
277 .get(&table_id)
278 .expect("should exist")
279 .committed_epoch,
280 table_watermarks.watermark_type,
281 )),
282 None => None,
283 }
284 },
285 staging: StagingVersion {
286 pending_imm_size: 0,
287 pending_imms: Vec::default(),
288 uploading_imms: VecDeque::default(),
289 sst: VecDeque::default(),
290 },
291
292 committed: committed_version,
293
294 is_replicated,
295 vnodes,
296 is_initialized: false,
297 }
298 }
299
300 pub fn new(
301 table_id: TableId,
302 instance_id: LocalInstanceId,
303 committed_version: CommittedVersion,
304 vnodes: Arc<Bitmap>,
305 ) -> Self {
306 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
307 }
308
309 pub fn table_id(&self) -> TableId {
310 self.table_id
311 }
312
313 pub fn init(&mut self) {
314 assert!(!self.is_initialized);
315 self.is_initialized = true;
316 }
317
318 pub fn add_pending_imm(&mut self, imm: ImmutableMemtable, tracker: MemoryTracker) {
319 assert!(self.is_initialized);
320 assert!(!self.is_replicated);
321 if let Some(item) = self
322 .staging
323 .pending_imms
324 .last()
325 .map(|(imm, _)| imm)
326 .or_else(|| self.staging.uploading_imms.front())
327 {
328 assert!(item.batch_id() < imm.batch_id());
330 }
331
332 self.staging.pending_imm_size += imm.size();
333 self.staging.pending_imms.push((imm, tracker));
334 }
335
336 pub fn add_replicated_imm(&mut self, imm: ImmutableMemtable) {
337 assert!(self.is_initialized);
338 assert!(self.is_replicated);
339 assert!(self.staging.pending_imms.is_empty());
340 if let Some(item) = self.staging.uploading_imms.front() {
341 assert!(item.batch_id() < imm.batch_id());
343 }
344 self.staging.uploading_imms.push_front(imm);
345 }
346
347 pub fn pending_imm_size(&self) -> usize {
348 self.staging.pending_imm_size
349 }
350
351 pub fn start_upload_pending_imms(&mut self) -> Vec<(ImmutableMemtable, MemoryTracker)> {
352 assert!(self.is_initialized);
353 assert!(!self.is_replicated);
354 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
355 for (imm, _) in &pending_imms {
356 self.staging.uploading_imms.push_front(imm.clone());
357 }
358 self.staging.pending_imm_size = 0;
359 pending_imms
360 }
361
362 pub fn update(&mut self, info: VersionUpdate) {
372 match info {
373 VersionUpdate::Sst(staging_sst_ref) => {
374 {
375 assert!(!self.is_replicated);
376 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
377 warn!(
378 instance_id = self.instance_id,
379 "no related imm in sst input"
380 );
381 return;
382 };
383
384 for imm_id in imms.iter().rev() {
386 let check_err = match self.staging.uploading_imms.pop_back() {
387 None => Some("empty".to_owned()),
388 Some(prev_imm_id) => {
389 if prev_imm_id.batch_id() == *imm_id {
390 None
391 } else {
392 Some(format!(
393 "miss match id {} {}",
394 prev_imm_id.batch_id(),
395 *imm_id
396 ))
397 }
398 }
399 };
400 assert!(
401 check_err.is_none(),
402 "should be valid staging_sst.size {},
403 staging_sst.imm_ids {:?},
404 staging_sst.epochs {:?},
405 local_pending_imm_ids {:?},
406 local_uploading_imm_ids {:?},
407 instance_id {}
408 check_err {:?}",
409 staging_sst_ref.imm_size,
410 staging_sst_ref.imm_ids,
411 staging_sst_ref.epochs,
412 self.staging
413 .pending_imms
414 .iter()
415 .map(|(imm, _)| imm.batch_id())
416 .collect_vec(),
417 self.staging
418 .uploading_imms
419 .iter()
420 .map(|imm| imm.batch_id())
421 .collect_vec(),
422 self.instance_id,
423 check_err
424 );
425 }
426
427 self.staging.sst.push_front(staging_sst_ref);
428 }
429 }
430
431 VersionUpdate::CommittedSnapshot(committed_version) => {
432 if let Some(info) = committed_version
433 .state_table_info
434 .info()
435 .get(&self.table_id)
436 {
437 let committed_epoch = info.committed_epoch;
438 if self.is_replicated {
439 self.staging
440 .uploading_imms
441 .retain(|imm| imm.min_epoch() > committed_epoch);
442 self.staging
443 .pending_imms
444 .retain(|(imm, _)| imm.min_epoch() > committed_epoch);
445 } else {
446 self.staging
447 .pending_imms
448 .iter()
449 .map(|(imm, _)| imm)
450 .chain(self.staging.uploading_imms.iter())
451 .for_each(|imm| {
452 assert!(
453 imm.min_epoch() > committed_epoch,
454 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
455 imm.table_id,
456 imm.min_epoch(),
457 committed_epoch
458 )
459 });
460 }
461
462 self.staging.sst.retain(|sst| {
463 sst.epochs.first().expect("epochs not empty") > &committed_epoch
464 });
465
466 assert!(self.staging.sst.iter().all(|sst| {
468 sst.epochs.last().expect("epochs not empty") > &committed_epoch
469 }));
470
471 if let Some(committed_watermarks) =
472 committed_version.table_watermarks.get(&self.table_id)
473 {
474 if let Some(watermark_index) = &mut self.table_watermarks {
475 watermark_index.apply_committed_watermarks(
476 committed_watermarks.clone(),
477 committed_epoch,
478 );
479 } else {
480 self.table_watermarks = Some(TableWatermarksIndex::new_committed(
481 committed_watermarks.clone(),
482 committed_epoch,
483 committed_watermarks.watermark_type,
484 ));
485 }
486 }
487 }
488
489 self.committed = committed_version;
490 }
491 VersionUpdate::NewTableWatermark {
492 direction,
493 epoch,
494 vnode_watermarks,
495 watermark_type,
496 } => {
497 if let Some(watermark_index) = &mut self.table_watermarks {
498 watermark_index.add_epoch_watermark(
499 epoch,
500 Arc::from(vnode_watermarks),
501 direction,
502 );
503 } else {
504 self.table_watermarks = Some(TableWatermarksIndex::new(
505 direction,
506 epoch,
507 vnode_watermarks,
508 self.committed.table_committed_epoch(self.table_id),
509 watermark_type,
510 ));
511 }
512 }
513 }
514 }
515
516 pub fn staging(&self) -> &StagingVersion {
517 &self.staging
518 }
519
520 pub fn committed(&self) -> &CommittedVersion {
521 &self.committed
522 }
523
524 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
529 if let Some(watermark_index) = &self.table_watermarks {
530 watermark_index.filter_regress_watermarks(watermarks)
531 }
532 }
533
534 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
535 self.table_watermarks
536 .as_ref()
537 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
538 }
539
540 pub fn is_initialized(&self) -> bool {
541 self.is_initialized
542 }
543
544 pub fn is_replicated(&self) -> bool {
545 self.is_replicated
546 }
547
548 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
549 std::mem::replace(&mut self.vnodes, vnodes)
550 }
551
552 pub fn contains(&self, vnode: VirtualNode) -> bool {
553 self.vnodes.is_set(vnode.to_index())
554 }
555
556 pub fn vnodes(&self) -> Arc<Bitmap> {
557 self.vnodes.clone()
558 }
559}
560
561pub fn read_filter_for_version(
562 epoch: HummockEpoch,
563 table_id: TableId,
564 mut table_key_range: TableKeyRange,
565 read_version: &RwLock<HummockReadVersion>,
566) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
567 let read_version_guard = read_version.read();
568
569 let committed_version = read_version_guard.committed().clone();
570
571 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
572 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
573 }
574
575 let (imm_iter, sst_iter) =
576 read_version_guard
577 .staging()
578 .prune_overlap(epoch, table_id, &table_key_range);
579
580 let imms = imm_iter.cloned().collect();
581 let ssts = sst_iter.cloned().collect();
582
583 Ok((table_key_range, (imms, ssts, committed_version)))
584}
585
586#[derive(Clone)]
587pub struct HummockVersionReader {
588 sstable_store: SstableStoreRef,
589
590 state_store_metrics: Arc<HummockStateStoreMetrics>,
592 preload_retry_times: usize,
593}
594
595impl HummockVersionReader {
598 pub fn new(
599 sstable_store: SstableStoreRef,
600 state_store_metrics: Arc<HummockStateStoreMetrics>,
601 preload_retry_times: usize,
602 ) -> Self {
603 Self {
604 sstable_store,
605 state_store_metrics,
606 preload_retry_times,
607 }
608 }
609
610 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
611 &self.state_store_metrics
612 }
613}
614
615const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
616
617impl HummockVersionReader {
618 fn skip_get_by_vnode_user_key_range(
619 sstable_info: &SstableInfo,
620 vnode: VirtualNode,
621 user_key: UserKey<&[u8]>,
622 local_stats: &mut StoreLocalStatistic,
623 ) -> bool {
624 if let Some(vnode_statistics) = &sstable_info.vnode_statistics {
625 if let Some((vnode_min, vnode_max)) = vnode_statistics.get_vnode_user_key_range(vnode) {
628 local_stats.vnode_checked_get_count += 1;
629 if user_key < vnode_min.as_ref() || user_key > vnode_max.as_ref() {
630 local_stats.vnode_pruned_get_count += 1;
631 return true;
632 }
633 }
634 }
635 false
636 }
637
638 pub async fn get<'a, O>(
639 &'a self,
640 table_key: TableKey<Bytes>,
641 epoch: u64,
642 table_id: TableId,
643 table_option: TableOption,
644 read_options: ReadOptions,
645 read_version_tuple: ReadVersionTuple,
646 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
647 ) -> StorageResult<Option<O>> {
648 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
649
650 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
651 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
652 let local_stats = &mut stats_guard.local_stats;
653 local_stats.found_key = true;
654
655 for imm in &imms {
657 if imm.max_epoch() < min_epoch {
659 continue;
660 }
661
662 local_stats.staging_imm_get_count += 1;
663
664 if let Some((data, data_epoch)) = get_from_batch(
665 imm,
666 TableKey(table_key.as_ref()),
667 epoch,
668 &read_options,
669 local_stats,
670 ) {
671 return Ok(if data_epoch.pure_epoch() < min_epoch {
672 None
673 } else {
674 data.into_user_value()
675 .map(|v| {
676 on_key_value_fn(
677 FullKey::new_with_gap_epoch(
678 table_id,
679 table_key.to_ref(),
680 data_epoch,
681 ),
682 v.as_ref(),
683 )
684 })
685 .transpose()?
686 });
687 }
688 }
689
690 let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
692 Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
693 });
694
695 let full_key = FullKey::new_with_gap_epoch(
698 table_id,
699 TableKey(table_key.clone()),
700 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
701 );
702 let single_table_key_range = table_key.clone()..=table_key.clone();
703
704 let pruned_uncommitted_ssts =
706 prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
707 for local_sst in pruned_uncommitted_ssts {
708 local_stats.staging_sst_get_count += 1;
709 if let Some(iter) = get_from_sstable_info(
710 self.sstable_store.clone(),
711 local_sst,
712 full_key.to_ref(),
713 &read_options,
714 dist_key_hash,
715 local_stats,
716 )
717 .await?
718 {
719 debug_assert!(iter.is_valid());
720 let data_epoch = iter.key().epoch_with_gap;
721 return Ok(if data_epoch.pure_epoch() < min_epoch {
722 None
723 } else {
724 iter.value()
725 .into_user_value()
726 .map(|v| {
727 on_key_value_fn(
728 FullKey::new_with_gap_epoch(
729 table_id,
730 table_key.to_ref(),
731 data_epoch,
732 ),
733 v,
734 )
735 })
736 .transpose()?
737 });
738 }
739 }
740 assert!(committed_version.is_valid());
744 for level in committed_version.levels(table_id) {
745 if level.table_infos.is_empty() {
746 continue;
747 }
748
749 match level.level_type {
750 LevelType::Overlapping | LevelType::Unspecified => {
751 let sstable_infos = prune_overlapping_ssts(
752 &level.table_infos,
753 table_id,
754 &single_table_key_range,
755 );
756 for sstable_info in sstable_infos {
757 if Self::skip_get_by_vnode_user_key_range(
759 sstable_info,
760 VirtualNode::from_index(full_key.user_key.get_vnode_id()),
761 full_key.user_key.as_ref(),
762 local_stats,
763 ) {
764 continue;
765 }
766
767 local_stats.overlapping_get_count += 1;
768 if let Some(iter) = get_from_sstable_info(
769 self.sstable_store.clone(),
770 sstable_info,
771 full_key.to_ref(),
772 &read_options,
773 dist_key_hash,
774 local_stats,
775 )
776 .await?
777 {
778 debug_assert!(iter.is_valid());
779 let data_epoch = iter.key().epoch_with_gap;
780 return Ok(if data_epoch.pure_epoch() < min_epoch {
781 None
782 } else {
783 iter.value()
784 .into_user_value()
785 .map(|v| {
786 on_key_value_fn(
787 FullKey::new_with_gap_epoch(
788 table_id,
789 table_key.to_ref(),
790 data_epoch,
791 ),
792 v,
793 )
794 })
795 .transpose()?
796 });
797 }
798 }
799 }
800 LevelType::Nonoverlapping => {
801 let mut table_info_idx =
802 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
803 if table_info_idx == 0 {
804 continue;
805 }
806 table_info_idx = table_info_idx.saturating_sub(1);
807 let sstable_info = &level.table_infos[table_info_idx];
808
809 if sstable_info.table_ids.binary_search(&table_id).is_err() {
810 continue;
811 }
812
813 let ord = sstable_info
815 .key_range
816 .compare_right_with_user_key(full_key.user_key.as_ref());
817 if ord == Ordering::Less {
819 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
820 continue;
821 }
822
823 if Self::skip_get_by_vnode_user_key_range(
825 sstable_info,
826 VirtualNode::from_index(full_key.user_key.get_vnode_id()),
827 full_key.user_key.as_ref(),
828 local_stats,
829 ) {
830 continue;
831 }
832
833 local_stats.non_overlapping_get_count += 1;
834 if let Some(iter) = get_from_sstable_info(
835 self.sstable_store.clone(),
836 sstable_info,
837 full_key.to_ref(),
838 &read_options,
839 dist_key_hash,
840 local_stats,
841 )
842 .await?
843 {
844 debug_assert!(iter.is_valid());
845 let data_epoch = iter.key().epoch_with_gap;
846 return Ok(if data_epoch.pure_epoch() < min_epoch {
847 None
848 } else {
849 iter.value()
850 .into_user_value()
851 .map(|v| {
852 on_key_value_fn(
853 FullKey::new_with_gap_epoch(
854 table_id,
855 table_key.to_ref(),
856 data_epoch,
857 ),
858 v,
859 )
860 })
861 .transpose()?
862 });
863 }
864 }
865 }
866 }
867 stats_guard.local_stats.found_key = false;
868 Ok(None)
869 }
870
871 pub async fn iter(
872 &self,
873 table_key_range: TableKeyRange,
874 epoch: u64,
875 table_id: TableId,
876 table_option: TableOption,
877 read_options: ReadOptions,
878 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
879 ) -> StorageResult<HummockStorageIterator> {
880 self.iter_with_memtable(
881 table_key_range,
882 epoch,
883 table_id,
884 table_option,
885 read_options,
886 read_version_tuple,
887 None,
888 )
889 .await
890 }
891
892 pub async fn iter_with_memtable<'b>(
893 &self,
894 table_key_range: TableKeyRange,
895 epoch: u64,
896 table_id: TableId,
897 table_option: TableOption,
898 read_options: ReadOptions,
899 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
900 memtable_iter: Option<MemTableHummockIterator<'b>>,
901 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
902 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
903 let user_key_range = (
904 user_key_range_ref.0.map(|key| key.cloned()),
905 user_key_range_ref.1.map(|key| key.cloned()),
906 );
907 let mut factory = ForwardIteratorFactory::default();
908 let mut local_stats = StoreLocalStatistic::default();
909 let (imms, uncommitted_ssts, committed) = read_version_tuple;
910 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
911 self.iter_inner(
912 table_key_range,
913 epoch,
914 table_id,
915 read_options,
916 imms,
917 uncommitted_ssts,
918 &committed,
919 &mut local_stats,
920 &mut factory,
921 )
922 .await?;
923 let merge_iter = factory.build(memtable_iter);
924 let mut user_iter = UserIterator::new(
926 merge_iter,
927 user_key_range,
928 epoch,
929 min_epoch,
930 Some(committed),
931 );
932 user_iter.rewind().await?;
933 Ok(HummockStorageIteratorInner::new(
934 user_iter,
935 self.state_store_metrics.clone(),
936 table_id,
937 local_stats,
938 ))
939 }
940
941 pub async fn rev_iter<'b>(
942 &self,
943 table_key_range: TableKeyRange,
944 epoch: u64,
945 table_id: TableId,
946 table_option: TableOption,
947 read_options: ReadOptions,
948 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
949 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
950 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
951 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
952 let user_key_range = (
953 user_key_range_ref.0.map(|key| key.cloned()),
954 user_key_range_ref.1.map(|key| key.cloned()),
955 );
956 let mut factory = BackwardIteratorFactory::default();
957 let mut local_stats = StoreLocalStatistic::default();
958 let (imms, uncommitted_ssts, committed) = read_version_tuple;
959 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
960 self.iter_inner(
961 table_key_range,
962 epoch,
963 table_id,
964 read_options,
965 imms,
966 uncommitted_ssts,
967 &committed,
968 &mut local_stats,
969 &mut factory,
970 )
971 .await?;
972 let merge_iter = factory.build(memtable_iter);
973 let mut user_iter = BackwardUserIterator::new(
975 merge_iter,
976 user_key_range,
977 epoch,
978 min_epoch,
979 Some(committed),
980 );
981 user_iter.rewind().await?;
982 Ok(HummockStorageRevIteratorInner::new(
983 user_iter,
984 self.state_store_metrics.clone(),
985 table_id,
986 local_stats,
987 ))
988 }
989
990 async fn iter_inner<F: IteratorFactory>(
991 &self,
992 table_key_range: TableKeyRange,
993 epoch: u64,
994 table_id: TableId,
995 read_options: ReadOptions,
996 imms: Vec<ImmutableMemtable>,
997 uncommitted_ssts: Vec<SstableInfo>,
998 committed: &CommittedVersion,
999 local_stats: &mut StoreLocalStatistic,
1000 factory: &mut F,
1001 ) -> StorageResult<()> {
1002 {
1003 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
1004 match bound {
1005 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
1006 Bound::Unbounded => None,
1007 }
1008 }
1009 let (left, right) = &table_key_range;
1010 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
1011 && right < left
1012 {
1013 if cfg!(debug_assertions) {
1014 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
1015 } else {
1016 return Err(HummockError::other(format!(
1017 "invalid iter key range: {table_id} {left:?} {right:?}"
1018 ))
1019 .into());
1020 }
1021 }
1022 }
1023
1024 local_stats.staging_imm_iter_count = imms.len() as u64;
1025 for imm in imms {
1026 factory.add_batch_iter(imm);
1027 }
1028
1029 let user_key_range = bound_table_key_range(table_id, &table_key_range);
1033 let user_key_range_ref = (
1034 user_key_range.0.as_ref().map(UserKey::as_ref),
1035 user_key_range.1.as_ref().map(UserKey::as_ref),
1036 );
1037 let mut staging_sst_iter_count = 0;
1038 let bloom_filter_prefix_hash = read_options
1040 .prefix_hint
1041 .as_ref()
1042 .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
1043 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
1044 if read_options.prefetch_options.prefetch {
1045 sst_read_options.must_iterated_end_user_key =
1046 Some(user_key_range.1.map(|key| key.cloned()));
1047 sst_read_options.max_preload_retry_times = self.preload_retry_times;
1048 }
1049 let sst_read_options = Arc::new(sst_read_options);
1050 for sstable_info in &uncommitted_ssts {
1051 let table_holder = self
1052 .sstable_store
1053 .sstable(sstable_info, local_stats)
1054 .await?;
1055
1056 if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
1057 && !hit_sstable_bloom_filter(
1058 &table_holder,
1059 &user_key_range_ref,
1060 *prefix_hash,
1061 local_stats,
1062 )
1063 {
1064 continue;
1065 }
1066
1067 staging_sst_iter_count += 1;
1068 factory.add_staging_sst_iter(F::SstableIteratorType::create(
1069 table_holder,
1070 self.sstable_store.clone(),
1071 sst_read_options.clone(),
1072 sstable_info,
1073 ));
1074 }
1075 local_stats.staging_sst_iter_count = staging_sst_iter_count;
1076
1077 let timer = Instant::now();
1078
1079 for level in committed.levels(table_id) {
1080 if level.table_infos.is_empty() {
1081 continue;
1082 }
1083
1084 if level.level_type == LevelType::Nonoverlapping {
1085 let mut table_infos =
1086 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1087 .peekable();
1088
1089 if table_infos.peek().is_none() {
1090 continue;
1091 }
1092 let sstable_infos = table_infos.cloned().collect_vec();
1093 if sstable_infos.len() > 1 {
1094 factory.add_concat_sst_iter(
1095 sstable_infos,
1096 self.sstable_store.clone(),
1097 sst_read_options.clone(),
1098 );
1099 local_stats.non_overlapping_iter_count += 1;
1100 } else {
1101 let sstable_info = &sstable_infos[0];
1102
1103 let sstable = self
1104 .sstable_store
1105 .sstable(sstable_info, local_stats)
1106 .await?;
1107
1108 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1109 && !hit_sstable_bloom_filter(
1110 &sstable,
1111 &user_key_range_ref,
1112 *dist_hash,
1113 local_stats,
1114 )
1115 {
1116 continue;
1117 }
1118 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1124 sstable,
1125 self.sstable_store.clone(),
1126 sst_read_options.clone(),
1127 sstable_info,
1128 ));
1129 local_stats.non_overlapping_iter_count += 1;
1130 }
1131 } else {
1132 let table_infos =
1133 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1134 let fetch_meta_req = table_infos.rev().collect_vec();
1136 if fetch_meta_req.is_empty() {
1137 continue;
1138 }
1139 for sstable_info in fetch_meta_req {
1140 let sstable = self
1141 .sstable_store
1142 .sstable(sstable_info, local_stats)
1143 .await?;
1144 assert_eq!(sstable_info.object_id, sstable.id);
1145 if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1146 && !hit_sstable_bloom_filter(
1147 &sstable,
1148 &user_key_range_ref,
1149 *dist_hash,
1150 local_stats,
1151 )
1152 {
1153 continue;
1154 }
1155 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1156 sstable,
1157 self.sstable_store.clone(),
1158 sst_read_options.clone(),
1159 sstable_info,
1160 ));
1161 local_stats.overlapping_iter_count += 1;
1162 }
1163 }
1164 }
1165 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1166 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1167 let table_id_string = table_id.to_string();
1168 tracing::warn!(
1169 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1170 table_id_string,
1171 epoch,
1172 fetch_meta_duration_sec,
1173 local_stats.cache_meta_block_miss
1174 );
1175 self.state_store_metrics
1176 .iter_slow_fetch_meta_cache_unhits
1177 .set(local_stats.cache_meta_block_miss as i64);
1178 }
1179 Ok(())
1180 }
1181
1182 pub async fn iter_log(
1183 &self,
1184 epoch_range: (u64, u64),
1185 key_range: TableKeyRange,
1186 options: ReadLogOptions,
1187 table_change_log_manager: Arc<TableChangeLogManager>,
1188 ) -> HummockResult<ChangeLogIterator> {
1189 let change_log: Vec<_> = {
1191 let table_change_logs = table_change_log_manager
1192 .fetch_table_change_logs(options.table_id, epoch_range, false, None)
1193 .await?;
1194 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1195 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1196 } else {
1197 Vec::new()
1198 }
1199 };
1200
1201 if let Some(max_epoch_change_log) = change_log.last() {
1202 let (_, max_epoch) = epoch_range;
1203 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1204 warn!(
1205 max_epoch,
1206 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1207 table_id = %options.table_id,
1208 "max_epoch does not exist"
1209 );
1210 }
1211 }
1212 let read_options = Arc::new(SstableIteratorReadOptions {
1213 cache_policy: Default::default(),
1214 must_iterated_end_user_key: None,
1215 max_preload_retry_times: 0,
1216 prefetch_for_large_query: false,
1217 });
1218
1219 async fn make_iter(
1220 sstable_infos: impl Iterator<Item = &SstableInfo>,
1221 sstable_store: &SstableStoreRef,
1222 read_options: Arc<SstableIteratorReadOptions>,
1223 local_stat: &mut StoreLocalStatistic,
1224 ) -> HummockResult<MergeIterator<SstableIterator>> {
1225 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1226 let sstable_store = sstable_store.clone();
1227 let read_options = read_options.clone();
1228 async move {
1229 let mut local_stat = StoreLocalStatistic::default();
1230 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1231 Ok::<_, HummockError>((
1232 SstableIterator::new(
1233 table_holder,
1234 sstable_store,
1235 read_options,
1236 sstable_info,
1237 ),
1238 local_stat,
1239 ))
1240 }
1241 }))
1242 .await?;
1243 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1244 |(iter, stats)| {
1245 local_stat.add(&stats);
1246 iter
1247 },
1248 )))
1249 }
1250
1251 let mut local_stat = StoreLocalStatistic::default();
1252
1253 let new_value_iter = make_iter(
1254 change_log
1255 .iter()
1256 .flat_map(|log| log.new_value.iter())
1257 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1258 &self.sstable_store,
1259 read_options.clone(),
1260 &mut local_stat,
1261 )
1262 .await?;
1263 let old_value_iter = make_iter(
1264 change_log
1265 .iter()
1266 .flat_map(|log| log.old_value.iter())
1267 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1268 &self.sstable_store,
1269 read_options.clone(),
1270 &mut local_stat,
1271 )
1272 .await?;
1273 ChangeLogIterator::new(
1274 epoch_range,
1275 key_range,
1276 new_value_iter,
1277 old_value_iter,
1278 options.table_id,
1279 IterLocalMetricsGuard::new(
1280 self.state_store_metrics.clone(),
1281 options.table_id,
1282 local_stat,
1283 ),
1284 )
1285 .await
1286 }
1287
1288 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1289 &'a self,
1290 version: PinnedVersion,
1291 table_id: TableId,
1292 target: VectorRef<'a>,
1293 options: VectorNearestOptions,
1294 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1295 ) -> HummockResult<Vec<O>> {
1296 let Some(index) = version.vector_indexes.get(&table_id) else {
1297 return Ok(vec![]);
1298 };
1299 if target.dimension() != index.dimension {
1300 return Err(HummockError::other(format!(
1301 "target dimension {} not match index dimension {}",
1302 target.dimension(),
1303 index.dimension
1304 )));
1305 }
1306 match &index.inner {
1307 VectorIndexImpl::Flat(flat) => {
1308 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1309 let mut cache_stat = VectorStoreCacheStats::default();
1310 for vector_file in &flat.vector_store_info.vector_files {
1311 let meta = self
1312 .sstable_store
1313 .get_vector_file_meta(vector_file, &mut cache_stat)
1314 .await?;
1315 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1316 let block = self
1317 .sstable_store
1318 .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1319 .await?;
1320 builder.add(&**block, &on_nearest_item_fn);
1321 }
1322 }
1323 cache_stat.report(table_id, "flat", self.stats());
1324 Ok(builder.finish())
1325 }
1326 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1327 let Some(graph_file) = &hnsw_flat.graph_file else {
1328 return Ok(vec![]);
1329 };
1330
1331 let mut ctx = FileVectorStoreCtx::default();
1332
1333 let graph = self
1334 .sstable_store
1335 .get_hnsw_graph(graph_file, &mut ctx.stats)
1336 .await?;
1337
1338 let vector_store =
1339 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1340 let (items, stats) = nearest::<O, M, _>(
1341 &vector_store,
1342 &mut ctx,
1343 &*graph,
1344 target,
1345 on_nearest_item_fn,
1346 options.hnsw_ef_search,
1347 options.top_n,
1348 )
1349 .await?;
1350 ctx.stats.report(table_id, "hnsw_read", self.stats());
1351 report_hnsw_stat(
1352 self.stats(),
1353 table_id,
1354 "hnsw_read",
1355 options.top_n,
1356 options.hnsw_ef_search,
1357 [stats],
1358 );
1359 Ok(items)
1360 }
1361 }
1362 }
1363}
1364
1365#[cfg(test)]
1366mod tests {
1367 use std::collections::{BTreeMap, HashMap, HashSet};
1368 use std::sync::Arc;
1369
1370 use bytes::Bytes;
1371 use prometheus::Registry;
1372 use risingwave_common::catalog::{TableId, TableOption};
1373 use risingwave_common::config::MetricLevel;
1374 use risingwave_common::hash::VirtualNode;
1375 use risingwave_common::util::epoch::test_epoch;
1376 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
1377 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_bytes};
1378 use risingwave_hummock_sdk::key_range::KeyRange;
1379 use risingwave_hummock_sdk::level::{Level, Levels, OverlappingLevel};
1380 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
1381 use risingwave_hummock_sdk::version::HummockVersion;
1382 use risingwave_hummock_sdk::{EpochWithGap, HummockSstableObjectId};
1383 use risingwave_pb::hummock::hummock_version::PbLevels;
1384 use risingwave_pb::hummock::{
1385 LevelType as PbLevelType, PbBloomFilterType, PbHummockVersion, PbLevel, PbOverlappingLevel,
1386 PbStateTableInfo, StateTableInfoDelta,
1387 };
1388 use tokio::sync::mpsc::unbounded_channel;
1389
1390 use crate::hummock::HummockValue;
1391 use crate::hummock::iterator::test_utils::mock_sstable_store;
1392 use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
1393 use crate::hummock::store::version::{CommittedVersion, HummockVersionReader};
1394 use crate::hummock::test_utils::{
1395 default_builder_opt_for_test, gen_test_sstable_with_table_ids,
1396 };
1397 use crate::monitor::{HummockStateStoreMetrics, flush_local_metrics_for_test};
1398 use crate::store::ReadOptions;
1399
1400 #[tokio::test]
1404 async fn test_get_skips_sst_by_table_id_filter() {
1405 let query_table_id = TableId::new(100);
1406 let epoch: u64 = (31 * 1000) << 16;
1407 let compaction_group_id = StaticCompactionGroupId::StateDefault;
1408
1409 let sst_info = SstableInfoInner {
1411 sst_id: 1.into(),
1412 object_id: 1.into(),
1413 key_range: KeyRange {
1414 left: Bytes::from(
1415 FullKey::for_test(TableId::new(50), b"aaa".to_vec(), epoch).encode(),
1416 ),
1417 right: Bytes::from(
1418 FullKey::for_test(TableId::new(150), b"zzz".to_vec(), epoch).encode(),
1419 ),
1420 right_exclusive: false,
1421 },
1422 table_ids: vec![TableId::new(50), TableId::new(150)],
1423 file_size: 1024,
1424 ..Default::default()
1425 }
1426 .into();
1427
1428 let level = Level {
1429 level_idx: 1,
1430 level_type: PbLevelType::Nonoverlapping,
1431 table_infos: vec![sst_info],
1432 total_file_size: 0,
1433 sub_level_id: 0,
1434 uncompressed_file_size: 0,
1435 vnode_partition_count: 0,
1436 };
1437
1438 #[allow(deprecated)]
1439 let levels = Levels {
1440 levels: vec![level],
1441 l0: OverlappingLevel::default(),
1442 group_id: compaction_group_id,
1443 parent_group_id: compaction_group_id,
1444 member_table_ids: vec![],
1445 compaction_group_version_id: 0,
1446 };
1447
1448 let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
1449 id: 1u64.into(),
1450 ..Default::default()
1451 });
1452 version.levels.insert(compaction_group_id, levels);
1453 version.state_table_info.apply_delta(
1454 &HashMap::from([(
1455 query_table_id,
1456 StateTableInfoDelta {
1457 committed_epoch: epoch,
1458 compaction_group_id,
1459 },
1460 )]),
1461 &HashSet::new(),
1462 );
1463
1464 let pinned_version = PinnedVersion::new(version, unbounded_channel().0);
1465 let reader = HummockVersionReader::new(
1466 mock_sstable_store().await,
1467 Arc::new(HummockStateStoreMetrics::unused()),
1468 0,
1469 );
1470
1471 let result = reader
1472 .get(
1473 TableKey(Bytes::from("test_key")),
1474 epoch,
1475 query_table_id,
1476 TableOption::default(),
1477 ReadOptions::default(),
1478 (vec![], vec![], pinned_version),
1479 |_key, _value| Ok(()),
1480 )
1481 .await
1482 .unwrap();
1483
1484 assert!(result.is_none());
1485 }
1486
1487 #[allow(deprecated)]
1489 fn build_version_with_vnode_stats(
1490 table_id: TableId,
1491 vnode_stats: VnodeStatistics,
1492 key_range: (Vec<u8>, Vec<u8>),
1493 level_type: PbLevelType,
1494 ) -> (SstableInfo, CommittedVersion) {
1495 let object_id = HummockSstableObjectId::new(1);
1496 let left_full_key = FullKey::new_with_gap_epoch(
1497 table_id,
1498 TableKey(Bytes::from(key_range.0)),
1499 EpochWithGap::new_from_epoch(test_epoch(0)),
1500 )
1501 .encode();
1502 let right_full_key = FullKey::new_with_gap_epoch(
1503 table_id,
1504 TableKey(Bytes::from(key_range.1)),
1505 EpochWithGap::new_from_epoch(test_epoch(0)),
1506 )
1507 .encode();
1508
1509 let sstable_info: SstableInfo = SstableInfoInner {
1510 object_id,
1511 sst_id: object_id.as_raw_id().into(),
1512 key_range: KeyRange {
1513 left: Bytes::from(left_full_key),
1514 right: Bytes::from(right_full_key),
1515 right_exclusive: false,
1516 },
1517 file_size: 1,
1518 table_ids: vec![table_id],
1519 meta_offset: 0,
1520 stale_key_count: 0,
1521 total_key_count: 0,
1522 min_epoch: 0,
1523 max_epoch: 0,
1524 uncompressed_file_size: 0,
1525 range_tombstone_count: 0,
1526 bloom_filter_kind: PbBloomFilterType::Sstable,
1527 sst_size: 1,
1528 vnode_statistics: Some(vnode_stats),
1529 }
1530 .into();
1531 let pb_level = PbLevel {
1532 level_idx: if level_type == PbLevelType::Overlapping {
1533 0
1534 } else {
1535 1
1536 },
1537 level_type: level_type as i32,
1538 table_infos: vec![sstable_info.clone().into()],
1539 total_file_size: 1,
1540 sub_level_id: 0,
1541 uncompressed_file_size: 1,
1542 vnode_partition_count: 0,
1543 };
1544
1545 let (levels, l0) = if level_type == PbLevelType::Overlapping {
1546 (
1547 vec![],
1548 Some(PbOverlappingLevel {
1549 sub_levels: vec![pb_level],
1550 total_file_size: 1,
1551 uncompressed_file_size: 1,
1552 }),
1553 )
1554 } else {
1555 (vec![pb_level], Some(PbOverlappingLevel::default()))
1556 };
1557
1558 let pb_levels = PbLevels {
1559 levels,
1560 l0,
1561 group_id: StaticCompactionGroupId::NewCompactionGroup,
1562 parent_group_id: 0.into(),
1563 member_table_ids: vec![],
1564 compaction_group_version_id: 0,
1565 };
1566
1567 let pb_version = PbHummockVersion {
1568 id: 1.into(),
1569 levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1570 max_committed_epoch: 0,
1571 table_watermarks: HashMap::new(),
1572 table_change_logs: HashMap::new(),
1573 state_table_info: HashMap::from_iter([(
1574 table_id,
1575 PbStateTableInfo {
1576 committed_epoch: 0,
1577 compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1578 },
1579 )]),
1580 vector_indexes: HashMap::new(),
1581 };
1582
1583 let version = HummockVersion::from(&pb_version);
1584 let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1585 let pinned = PinnedVersion::new(version, tx);
1586 (sstable_info, pinned)
1587 }
1588
1589 #[allow(deprecated)]
1591 fn build_version_from_sstables(
1592 table_id: TableId,
1593 sstable_infos: Vec<SstableInfo>,
1594 level_type: PbLevelType,
1595 ) -> CommittedVersion {
1596 let total_file_size = sstable_infos.iter().map(|sst| sst.file_size).sum::<u64>();
1597 let uncompressed_file_size = sstable_infos
1598 .iter()
1599 .map(|sst| sst.uncompressed_file_size)
1600 .sum::<u64>();
1601 let pb_level = PbLevel {
1602 level_idx: if level_type == PbLevelType::Overlapping {
1603 0
1604 } else {
1605 1
1606 },
1607 level_type: level_type as i32,
1608 table_infos: sstable_infos.into_iter().map(Into::into).collect(),
1609 total_file_size,
1610 sub_level_id: 0,
1611 uncompressed_file_size,
1612 vnode_partition_count: 0,
1613 };
1614
1615 let (levels, l0) = if level_type == PbLevelType::Overlapping {
1616 (
1617 vec![],
1618 Some(PbOverlappingLevel {
1619 sub_levels: vec![pb_level],
1620 total_file_size,
1621 uncompressed_file_size,
1622 }),
1623 )
1624 } else {
1625 (vec![pb_level], Some(PbOverlappingLevel::default()))
1626 };
1627
1628 let pb_levels = PbLevels {
1629 levels,
1630 l0,
1631 group_id: StaticCompactionGroupId::NewCompactionGroup,
1632 parent_group_id: 0.into(),
1633 member_table_ids: vec![],
1634 compaction_group_version_id: 0,
1635 };
1636
1637 let pb_version = PbHummockVersion {
1638 id: 1.into(),
1639 levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1640 max_committed_epoch: 0,
1641 table_watermarks: HashMap::new(),
1642 table_change_logs: HashMap::new(),
1643 state_table_info: HashMap::from_iter([(
1644 table_id,
1645 PbStateTableInfo {
1646 committed_epoch: 0,
1647 compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1648 },
1649 )]),
1650 vector_indexes: HashMap::new(),
1651 };
1652
1653 let version = HummockVersion::from(&pb_version);
1654 let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1655 PinnedVersion::new(version, tx)
1656 }
1657
1658 #[allow(deprecated)]
1660 fn build_version_from_sstable(
1661 table_id: TableId,
1662 sstable_info: SstableInfo,
1663 ) -> CommittedVersion {
1664 build_version_from_sstables(table_id, vec![sstable_info], PbLevelType::Nonoverlapping)
1665 }
1666
1667 fn vnode_prune_counts(
1668 metrics: &HummockStateStoreMetrics,
1669 table_id: TableId,
1670 operation: &str,
1671 ) -> (u64, u64) {
1672 let table_label = table_id.to_string();
1673 let checked = metrics
1674 .vnode_pruning_counts
1675 .with_guarded_label_values(&[
1676 table_label.clone(),
1677 operation.to_owned(),
1678 "checked".to_owned(),
1679 ])
1680 .get();
1681 let pruned = metrics
1682 .vnode_pruning_counts
1683 .with_guarded_label_values(&[table_label, operation.to_owned(), "pruned".to_owned()])
1684 .get();
1685 (checked, pruned)
1686 }
1687
1688 async fn assert_vnode_prune_get_skips_out_of_range_key(
1689 table_id: TableId,
1690 epoch: u64,
1691 level_type: PbLevelType,
1692 ) {
1693 let sstable_store = mock_sstable_store().await;
1694 let registry = Registry::new();
1695 let metrics = Arc::new(HummockStateStoreMetrics::new(®istry, MetricLevel::Debug));
1696 let reader = HummockVersionReader::new(sstable_store, metrics.clone(), 0);
1697 let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1698
1699 let make_user_key = |vnode: VirtualNode, suffix: &str| {
1700 let mut raw = vnode.to_be_bytes().to_vec();
1701 raw.extend_from_slice(suffix.as_bytes());
1702 UserKey::new(table_id, TableKey(raw.into()))
1703 };
1704
1705 let vnode_stats = VnodeStatistics::from_map(BTreeMap::from_iter([(
1707 VirtualNode::from_index(1),
1708 (
1709 make_user_key(VirtualNode::from_index(1), "aa"),
1710 make_user_key(VirtualNode::from_index(1), "bb"),
1711 ),
1712 )]));
1713
1714 let key_range = {
1716 let mut left = VirtualNode::from_index(0).to_be_bytes().to_vec();
1717 left.extend_from_slice(b"aa");
1718 let mut right = VirtualNode::from_index(1).to_be_bytes().to_vec();
1719 right.extend_from_slice(b"zzzz");
1720 (left, right)
1721 };
1722
1723 let (_sst, committed) =
1724 build_version_with_vnode_stats(table_id, vnode_stats, key_range, level_type);
1725
1726 let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1728 raw.extend_from_slice(b"zz");
1729 let table_key = TableKey(Bytes::from(raw.clone()));
1730
1731 let result = reader
1732 .get(
1733 table_key,
1734 epoch,
1735 table_id,
1736 TableOption::default(),
1737 ReadOptions::default(),
1738 (vec![], vec![], committed),
1739 |_k, v| Ok(Bytes::copy_from_slice(v)),
1740 )
1741 .await
1742 .unwrap();
1743 flush_local_metrics_for_test();
1744
1745 assert!(
1746 result.is_none(),
1747 "vnode pruning should skip SST without reading data"
1748 );
1749 let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1750 assert_eq!(checked_before + 1, checked_after);
1751 assert_eq!(pruned_before + 1, pruned_after);
1752 }
1753
1754 async fn assert_vnode_prune_get_not_pruned_nonoverlapping() {
1755 let table_id = TableId::new(42);
1756 let epoch = test_epoch(3);
1757 let sstable_store = mock_sstable_store().await;
1758 let registry = Registry::new();
1759 let metrics = Arc::new(HummockStateStoreMetrics::new(®istry, MetricLevel::Debug));
1760 let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1761 let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1762
1763 let mut opts = default_builder_opt_for_test();
1764 opts.max_vnode_key_range_bytes = None;
1765 let mut kvs = vec![
1766 (
1767 FullKey::new_with_gap_epoch(
1768 table_id,
1769 gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1770 EpochWithGap::new_from_epoch(epoch),
1771 ),
1772 HummockValue::put(Bytes::from_static(b"v1")),
1773 ),
1774 (
1775 FullKey::new_with_gap_epoch(
1776 table_id,
1777 gen_key_from_bytes(VirtualNode::ZERO, b"cc"),
1778 EpochWithGap::new_from_epoch(epoch),
1779 ),
1780 HummockValue::put(Bytes::from_static(b"v0")),
1781 ),
1782 ];
1783 kvs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1784 let (_, mut sstable_info): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1785 gen_test_sstable_with_table_ids(
1786 opts,
1787 10,
1788 kvs.into_iter(),
1789 sstable_store.clone(),
1790 vec![table_id.as_raw_id()],
1791 )
1792 .await;
1793 let mut inner = sstable_info.get_inner();
1795 inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1796 VirtualNode::from_index(1),
1797 (
1798 UserKey::new(
1799 table_id,
1800 gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1801 ),
1802 UserKey::new(
1803 table_id,
1804 gen_key_from_bytes(VirtualNode::from_index(1), b"zz"),
1805 ),
1806 ),
1807 )])));
1808 sstable_info = inner.into();
1809 let committed = build_version_from_sstable(table_id, sstable_info.clone());
1810
1811 let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1813 raw.extend_from_slice(b"aa");
1814 let table_key = TableKey(Bytes::from(raw.clone()));
1815
1816 let result = reader
1817 .get(
1818 table_key,
1819 epoch,
1820 table_id,
1821 TableOption::default(),
1822 ReadOptions::default(),
1823 (vec![], vec![], committed),
1824 |_k, v| Ok(Bytes::copy_from_slice(v)),
1825 )
1826 .await
1827 .unwrap();
1828 flush_local_metrics_for_test();
1829 assert!(result.is_some(), "key should be read when not pruned");
1830 let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1831 assert_eq!(checked_before + 1, checked_after);
1832 assert_eq!(pruned_before, pruned_after);
1833 }
1834
1835 #[tokio::test]
1836 async fn test_vnode_prune_get_single_sst_cases() {
1837 assert_vnode_prune_get_skips_out_of_range_key(
1838 TableId::default(),
1839 test_epoch(1),
1840 PbLevelType::Nonoverlapping,
1841 )
1842 .await;
1843 assert_vnode_prune_get_skips_out_of_range_key(
1844 TableId::new(7),
1845 test_epoch(2),
1846 PbLevelType::Overlapping,
1847 )
1848 .await;
1849 assert_vnode_prune_get_not_pruned_nonoverlapping().await;
1850 }
1851
1852 #[tokio::test]
1853 async fn test_vnode_prune_get_overlapping_distribution_prunes_only_out_of_range_sst() {
1854 let table_id = TableId::new(77);
1855 let epoch = test_epoch(4);
1856 let vnode = VirtualNode::from_index(1);
1857 let sstable_store = mock_sstable_store().await;
1858 let registry = Registry::new();
1859 let metrics = Arc::new(HummockStateStoreMetrics::new(®istry, MetricLevel::Debug));
1860 let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1861 let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1862
1863 let mut opts = default_builder_opt_for_test();
1864 opts.max_vnode_key_range_bytes = None;
1865
1866 let mut kvs1 = vec![
1867 (
1868 FullKey::new_with_gap_epoch(
1869 table_id,
1870 gen_key_from_bytes(vnode, b"aa"),
1871 EpochWithGap::new_from_epoch(epoch),
1872 ),
1873 HummockValue::put(Bytes::from_static(b"s1_aa")),
1874 ),
1875 (
1876 FullKey::new_with_gap_epoch(
1877 table_id,
1878 gen_key_from_bytes(vnode, b"zz"),
1879 EpochWithGap::new_from_epoch(epoch),
1880 ),
1881 HummockValue::put(Bytes::from_static(b"s1_zz")),
1882 ),
1883 ];
1884 kvs1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1885 let (_, mut sst1): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1886 gen_test_sstable_with_table_ids(
1887 opts.clone(),
1888 11,
1889 kvs1.into_iter(),
1890 sstable_store.clone(),
1891 vec![table_id.as_raw_id()],
1892 )
1893 .await;
1894 let mut sst1_inner = sst1.get_inner();
1895 sst1_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1896 vnode,
1897 (
1898 UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1899 UserKey::new(table_id, gen_key_from_bytes(vnode, b"bb")),
1900 ),
1901 )])));
1902 sst1 = sst1_inner.into();
1903
1904 let mut kvs2 = vec![(
1905 FullKey::new_with_gap_epoch(
1906 table_id,
1907 gen_key_from_bytes(vnode, b"mm"),
1908 EpochWithGap::new_from_epoch(epoch),
1909 ),
1910 HummockValue::put(Bytes::from_static(b"hit")),
1911 )];
1912 kvs2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1913 let (_, mut sst2): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1914 gen_test_sstable_with_table_ids(
1915 opts,
1916 12,
1917 kvs2.into_iter(),
1918 sstable_store.clone(),
1919 vec![table_id.as_raw_id()],
1920 )
1921 .await;
1922 let mut sst2_inner = sst2.get_inner();
1923 sst2_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1924 vnode,
1925 (
1926 UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1927 UserKey::new(table_id, gen_key_from_bytes(vnode, b"zz")),
1928 ),
1929 )])));
1930 sst2 = sst2_inner.into();
1931
1932 let committed =
1933 build_version_from_sstables(table_id, vec![sst1, sst2], PbLevelType::Overlapping);
1934
1935 let table_key = gen_key_from_bytes(vnode, b"mm");
1936 let result = reader
1937 .get(
1938 table_key,
1939 epoch,
1940 table_id,
1941 TableOption::default(),
1942 ReadOptions::default(),
1943 (vec![], vec![], committed),
1944 |_k, v| Ok(Bytes::copy_from_slice(v)),
1945 )
1946 .await
1947 .unwrap();
1948 flush_local_metrics_for_test();
1949
1950 assert_eq!(result, Some(Bytes::from_static(b"hit")));
1951 let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1952 assert_eq!(checked_before + 2, checked_after);
1954 assert_eq!(pruned_before + 1, pruned_after);
1955 }
1956}