1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::{self};
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::array::VectorRef;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{TableId, TableOption};
29use risingwave_common::hash::VirtualNode;
30use risingwave_common::util::epoch::MAX_SPILL_TIMES;
31use risingwave_hummock_sdk::key::{
32 FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
33};
34use risingwave_hummock_sdk::key_range::KeyRangeCommon;
35use risingwave_hummock_sdk::sstable_info::SstableInfo;
36use risingwave_hummock_sdk::table_watermark::{
37 TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
40use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
41use risingwave_pb::hummock::LevelType;
42use sync_point::sync_point;
43use tracing::warn;
44
45use crate::error::StorageResult;
46use crate::hummock::event_handler::LocalInstanceId;
47use crate::hummock::iterator::change_log::ChangeLogIterator;
48use crate::hummock::iterator::{
49 BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
50};
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
53use crate::hummock::sstable_store::SstableStoreRef;
54use crate::hummock::table_change_log_manager::TableChangeLogManager;
55use crate::hummock::utils::{
56 MemoryTracker, filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts,
57 range_overlap, search_sst_idx,
58};
59use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
60use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
61use crate::hummock::{
62 BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
63 HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
64 ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
65 hit_sstable_filter,
66};
67use crate::mem_table::{
68 ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
69};
70use crate::monitor::{
71 GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
72};
73use crate::store::{
74 OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
75};
76use crate::vector::hnsw::nearest;
77use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
78
79pub type CommittedVersion = PinnedVersion;
80
81#[derive(Clone, Debug, PartialEq)]
87pub struct StagingSstableInfo {
88 sstable_infos: Vec<LocalSstableInfo>,
90 old_value_sstable_infos: Vec<LocalSstableInfo>,
91 epochs: Vec<HummockEpoch>,
94 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96 imm_size: usize,
97}
98
99impl StagingSstableInfo {
100 pub fn new(
101 sstable_infos: Vec<LocalSstableInfo>,
102 old_value_sstable_infos: Vec<LocalSstableInfo>,
103 epochs: Vec<HummockEpoch>,
104 imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
105 imm_size: usize,
106 ) -> Self {
107 assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
109 Self {
110 sstable_infos,
111 old_value_sstable_infos,
112 epochs,
113 imm_ids,
114 imm_size,
115 }
116 }
117
118 pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
119 &self.sstable_infos
120 }
121
122 pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
123 &self.old_value_sstable_infos
124 }
125
126 pub fn imm_size(&self) -> usize {
127 self.imm_size
128 }
129
130 pub fn epochs(&self) -> &Vec<HummockEpoch> {
131 &self.epochs
132 }
133
134 pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
135 &self.imm_ids
136 }
137}
138
139pub enum VersionUpdate {
140 Sst(Arc<StagingSstableInfo>),
141 CommittedSnapshot(CommittedVersion),
142 NewTableWatermark {
143 direction: WatermarkDirection,
144 epoch: HummockEpoch,
145 vnode_watermarks: Vec<VnodeWatermark>,
146 watermark_type: WatermarkSerdeType,
147 },
148}
149
150pub struct StagingVersion {
151 pending_imm_size: usize,
152 pub pending_imms: Vec<(ImmutableMemtable, MemoryTracker)>,
159 pub uploading_imms: VecDeque<ImmutableMemtable>,
165
166 pub sst: VecDeque<Arc<StagingSstableInfo>>,
168}
169
170impl StagingVersion {
171 pub fn prune_overlap<'a>(
174 &'a self,
175 max_epoch_inclusive: HummockEpoch,
176 table_id: TableId,
177 table_key_range: &'a TableKeyRange,
178 ) -> (
179 impl Iterator<Item = &'a ImmutableMemtable> + 'a,
180 impl Iterator<Item = &'a SstableInfo> + 'a,
181 ) {
182 let (left, right) = table_key_range;
183 let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
184 let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
185 let overlapped_imms = self
186 .pending_imms
187 .iter()
188 .map(|(imm, _)| imm)
189 .rev() .chain(self.uploading_imms.iter())
191 .filter(move |imm| {
192 imm.epoch() <= max_epoch_inclusive
194 && imm.table_id == table_id
195 && range_overlap(
196 &(left, right),
197 &imm.start_table_key(),
198 Bound::Included(&imm.end_table_key()),
199 )
200 });
201
202 let overlapped_ssts = self
204 .sst
205 .iter()
206 .filter(move |staging_sst| {
207 let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
208 sst_max_epoch <= max_epoch_inclusive
209 })
210 .flat_map(move |staging_sst| {
211 staging_sst
214 .sstable_infos
215 .iter()
216 .map(|sstable| &sstable.sst_info)
217 .filter(move |sstable: &&SstableInfo| {
218 filter_single_sst(sstable, table_id, table_key_range)
219 })
220 });
221 (overlapped_imms, overlapped_ssts)
222 }
223
224 pub fn is_empty(&self) -> bool {
225 self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
226 }
227}
228
229pub struct HummockReadVersion {
231 table_id: TableId,
232 instance_id: LocalInstanceId,
233
234 is_initialized: bool,
235
236 staging: StagingVersion,
238
239 committed: CommittedVersion,
241
242 is_replicated: bool,
247
248 table_watermarks: Option<TableWatermarksIndex>,
249
250 vnodes: Arc<Bitmap>,
253}
254
255impl HummockReadVersion {
256 pub fn new_with_replication_option(
257 table_id: TableId,
258 instance_id: LocalInstanceId,
259 committed_version: CommittedVersion,
260 is_replicated: bool,
261 vnodes: Arc<Bitmap>,
262 ) -> Self {
263 assert!(committed_version.is_valid());
267 Self {
268 table_id,
269 instance_id,
270 table_watermarks: {
271 match committed_version.table_watermarks.get(&table_id) {
272 Some(table_watermarks) => Some(TableWatermarksIndex::new_committed(
273 table_watermarks.clone(),
274 committed_version
275 .state_table_info
276 .info()
277 .get(&table_id)
278 .expect("should exist")
279 .committed_epoch,
280 table_watermarks.watermark_type,
281 )),
282 None => None,
283 }
284 },
285 staging: StagingVersion {
286 pending_imm_size: 0,
287 pending_imms: Vec::default(),
288 uploading_imms: VecDeque::default(),
289 sst: VecDeque::default(),
290 },
291
292 committed: committed_version,
293
294 is_replicated,
295 vnodes,
296 is_initialized: false,
297 }
298 }
299
300 pub fn new(
301 table_id: TableId,
302 instance_id: LocalInstanceId,
303 committed_version: CommittedVersion,
304 vnodes: Arc<Bitmap>,
305 ) -> Self {
306 Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
307 }
308
309 pub fn table_id(&self) -> TableId {
310 self.table_id
311 }
312
313 pub fn init(&mut self) {
314 assert!(!self.is_initialized);
315 self.is_initialized = true;
316 }
317
318 pub fn add_pending_imm(&mut self, imm: ImmutableMemtable, tracker: MemoryTracker) {
319 assert!(self.is_initialized);
320 assert!(!self.is_replicated);
321 if let Some(item) = self
322 .staging
323 .pending_imms
324 .last()
325 .map(|(imm, _)| imm)
326 .or_else(|| self.staging.uploading_imms.front())
327 {
328 assert!(item.batch_id() < imm.batch_id());
330 }
331
332 self.staging.pending_imm_size += imm.size();
333 self.staging.pending_imms.push((imm, tracker));
334 }
335
336 pub fn add_replicated_imm(&mut self, imm: ImmutableMemtable) {
337 assert!(self.is_initialized);
338 assert!(self.is_replicated);
339 assert!(self.staging.pending_imms.is_empty());
340 if let Some(item) = self.staging.uploading_imms.front() {
341 assert!(item.batch_id() < imm.batch_id());
343 }
344 self.staging.uploading_imms.push_front(imm);
345 }
346
347 pub fn pending_imm_size(&self) -> usize {
348 self.staging.pending_imm_size
349 }
350
351 pub fn start_upload_pending_imms(&mut self) -> Vec<(ImmutableMemtable, MemoryTracker)> {
352 assert!(self.is_initialized);
353 assert!(!self.is_replicated);
354 let pending_imms = std::mem::take(&mut self.staging.pending_imms);
355 for (imm, _) in &pending_imms {
356 self.staging.uploading_imms.push_front(imm.clone());
357 }
358 self.staging.pending_imm_size = 0;
359 pending_imms
360 }
361
362 pub fn update(&mut self, info: VersionUpdate) {
372 match info {
373 VersionUpdate::Sst(staging_sst_ref) => {
374 {
375 assert!(!self.is_replicated);
376 let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
377 warn!(
378 instance_id = self.instance_id,
379 "no related imm in sst input"
380 );
381 return;
382 };
383
384 for imm_id in imms.iter().rev() {
386 let check_err = match self.staging.uploading_imms.pop_back() {
387 None => Some("empty".to_owned()),
388 Some(prev_imm_id) => {
389 if prev_imm_id.batch_id() == *imm_id {
390 None
391 } else {
392 Some(format!(
393 "miss match id {} {}",
394 prev_imm_id.batch_id(),
395 *imm_id
396 ))
397 }
398 }
399 };
400 assert!(
401 check_err.is_none(),
402 "should be valid staging_sst.size {},
403 staging_sst.imm_ids {:?},
404 staging_sst.epochs {:?},
405 local_pending_imm_ids {:?},
406 local_uploading_imm_ids {:?},
407 instance_id {}
408 check_err {:?}",
409 staging_sst_ref.imm_size,
410 staging_sst_ref.imm_ids,
411 staging_sst_ref.epochs,
412 self.staging
413 .pending_imms
414 .iter()
415 .map(|(imm, _)| imm.batch_id())
416 .collect_vec(),
417 self.staging
418 .uploading_imms
419 .iter()
420 .map(|imm| imm.batch_id())
421 .collect_vec(),
422 self.instance_id,
423 check_err
424 );
425 }
426
427 self.staging.sst.push_front(staging_sst_ref);
428 }
429 }
430
431 VersionUpdate::CommittedSnapshot(committed_version) => {
432 if let Some(info) = committed_version
433 .state_table_info
434 .info()
435 .get(&self.table_id)
436 {
437 let committed_epoch = info.committed_epoch;
438 if self.is_replicated {
439 self.staging
440 .uploading_imms
441 .retain(|imm| imm.epoch() > committed_epoch);
442 self.staging
443 .pending_imms
444 .retain(|(imm, _)| imm.epoch() > committed_epoch);
445 } else {
446 self.staging
447 .pending_imms
448 .iter()
449 .map(|(imm, _)| imm)
450 .chain(self.staging.uploading_imms.iter())
451 .for_each(|imm| {
452 assert!(
453 imm.epoch() > committed_epoch,
454 "imm of table {} min_epoch {} should be greater than committed_epoch {}",
455 imm.table_id,
456 imm.epoch(),
457 committed_epoch
458 )
459 });
460 }
461
462 self.staging.sst.retain(|sst| {
463 sst.epochs.first().expect("epochs not empty") > &committed_epoch
464 });
465
466 assert!(self.staging.sst.iter().all(|sst| {
468 sst.epochs.last().expect("epochs not empty") > &committed_epoch
469 }));
470
471 if let Some(committed_watermarks) =
472 committed_version.table_watermarks.get(&self.table_id)
473 {
474 if let Some(watermark_index) = &mut self.table_watermarks {
475 watermark_index.apply_committed_watermarks(
476 committed_watermarks.clone(),
477 committed_epoch,
478 );
479 } else {
480 self.table_watermarks = Some(TableWatermarksIndex::new_committed(
481 committed_watermarks.clone(),
482 committed_epoch,
483 committed_watermarks.watermark_type,
484 ));
485 }
486 }
487 }
488
489 self.committed = committed_version;
490 }
491 VersionUpdate::NewTableWatermark {
492 direction,
493 epoch,
494 vnode_watermarks,
495 watermark_type,
496 } => {
497 if let Some(watermark_index) = &mut self.table_watermarks {
498 watermark_index.add_epoch_watermark(
499 epoch,
500 Arc::from(vnode_watermarks),
501 direction,
502 );
503 } else {
504 self.table_watermarks = Some(TableWatermarksIndex::new(
505 direction,
506 epoch,
507 vnode_watermarks,
508 self.committed.table_committed_epoch(self.table_id),
509 watermark_type,
510 ));
511 }
512 }
513 }
514 }
515
516 pub fn staging(&self) -> &StagingVersion {
517 &self.staging
518 }
519
520 pub fn committed(&self) -> &CommittedVersion {
521 &self.committed
522 }
523
524 pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
529 if let Some(watermark_index) = &self.table_watermarks {
530 watermark_index.filter_regress_watermarks(watermarks)
531 }
532 }
533
534 pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
535 self.table_watermarks
536 .as_ref()
537 .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
538 }
539
540 pub fn is_initialized(&self) -> bool {
541 self.is_initialized
542 }
543
544 pub fn is_replicated(&self) -> bool {
545 self.is_replicated
546 }
547
548 pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
549 std::mem::replace(&mut self.vnodes, vnodes)
550 }
551
552 pub fn contains(&self, vnode: VirtualNode) -> bool {
553 self.vnodes.is_set(vnode.to_index())
554 }
555
556 pub fn vnodes(&self) -> Arc<Bitmap> {
557 self.vnodes.clone()
558 }
559}
560
561pub fn read_filter_for_version(
562 epoch: HummockEpoch,
563 table_id: TableId,
564 mut table_key_range: TableKeyRange,
565 read_version: &RwLock<HummockReadVersion>,
566) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
567 let read_version_guard = read_version.read();
568
569 let committed_version = read_version_guard.committed().clone();
570
571 if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
572 watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
573 }
574
575 let (imm_iter, sst_iter) =
576 read_version_guard
577 .staging()
578 .prune_overlap(epoch, table_id, &table_key_range);
579
580 let imms = imm_iter.cloned().collect();
581 let ssts = sst_iter.cloned().collect();
582
583 Ok((table_key_range, (imms, ssts, committed_version)))
584}
585
586#[derive(Clone)]
587pub struct HummockVersionReader {
588 sstable_store: SstableStoreRef,
589
590 state_store_metrics: Arc<HummockStateStoreMetrics>,
592 preload_retry_times: usize,
593}
594
595impl HummockVersionReader {
598 pub fn new(
599 sstable_store: SstableStoreRef,
600 state_store_metrics: Arc<HummockStateStoreMetrics>,
601 preload_retry_times: usize,
602 ) -> Self {
603 Self {
604 sstable_store,
605 state_store_metrics,
606 preload_retry_times,
607 }
608 }
609
610 pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
611 &self.state_store_metrics
612 }
613}
614
615const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
616
617impl HummockVersionReader {
618 fn skip_get_by_vnode_user_key_range(
619 sstable_info: &SstableInfo,
620 vnode: VirtualNode,
621 user_key: UserKey<&[u8]>,
622 local_stats: &mut StoreLocalStatistic,
623 ) -> bool {
624 if let Some(vnode_statistics) = &sstable_info.vnode_statistics {
625 if let Some((vnode_min, vnode_max)) = vnode_statistics.get_vnode_user_key_range(vnode) {
628 local_stats.vnode_checked_get_count += 1;
629 if user_key < vnode_min.as_ref() || user_key > vnode_max.as_ref() {
630 local_stats.vnode_pruned_get_count += 1;
631 return true;
632 }
633 }
634 }
635 false
636 }
637
638 pub async fn get<'a, O>(
639 &'a self,
640 table_key: TableKey<Bytes>,
641 epoch: u64,
642 table_id: TableId,
643 table_option: TableOption,
644 read_options: ReadOptions,
645 read_version_tuple: ReadVersionTuple,
646 on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
647 ) -> StorageResult<Option<O>> {
648 let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
649
650 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
651 let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
652 let local_stats = &mut stats_guard.local_stats;
653 local_stats.found_key = true;
654
655 for imm in &imms {
657 if imm.epoch() < min_epoch {
659 continue;
660 }
661
662 local_stats.staging_imm_get_count += 1;
663
664 if let Some((data, data_epoch)) = get_from_batch(
665 imm,
666 TableKey(table_key.as_ref()),
667 epoch,
668 &read_options,
669 local_stats,
670 ) {
671 return Ok(if data_epoch.pure_epoch() < min_epoch {
672 None
673 } else {
674 data.into_user_value()
675 .map(|v| {
676 on_key_value_fn(
677 FullKey::new_with_gap_epoch(
678 table_id,
679 table_key.to_ref(),
680 data_epoch,
681 ),
682 v.as_ref(),
683 )
684 })
685 .transpose()?
686 });
687 }
688 }
689
690 let dist_key_hash = read_options
692 .prefix_hint
693 .as_ref()
694 .map(|dist_key| Sstable::hash_for_filter(dist_key.as_ref(), table_id.as_raw_id()));
695
696 let full_key = FullKey::new_with_gap_epoch(
699 table_id,
700 TableKey(table_key.clone()),
701 EpochWithGap::new(epoch, MAX_SPILL_TIMES),
702 );
703 let single_table_key_range = table_key.clone()..=table_key.clone();
704
705 let pruned_uncommitted_ssts =
707 prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
708 for local_sst in pruned_uncommitted_ssts {
709 local_stats.staging_sst_get_count += 1;
710 if let Some(iter) = get_from_sstable_info(
711 self.sstable_store.clone(),
712 local_sst,
713 full_key.to_ref(),
714 &read_options,
715 dist_key_hash,
716 local_stats,
717 )
718 .await?
719 {
720 debug_assert!(iter.is_valid());
721 let data_epoch = iter.key().epoch_with_gap;
722 return Ok(if data_epoch.pure_epoch() < min_epoch {
723 None
724 } else {
725 iter.value()
726 .into_user_value()
727 .map(|v| {
728 on_key_value_fn(
729 FullKey::new_with_gap_epoch(
730 table_id,
731 table_key.to_ref(),
732 data_epoch,
733 ),
734 v,
735 )
736 })
737 .transpose()?
738 });
739 }
740 }
741 assert!(committed_version.is_valid());
745 for level in committed_version.levels(table_id) {
746 if level.table_infos.is_empty() {
747 continue;
748 }
749
750 match level.level_type {
751 LevelType::Overlapping | LevelType::Unspecified => {
752 let sstable_infos = prune_overlapping_ssts(
753 &level.table_infos,
754 table_id,
755 &single_table_key_range,
756 );
757 for sstable_info in sstable_infos {
758 if Self::skip_get_by_vnode_user_key_range(
760 sstable_info,
761 VirtualNode::from_index(full_key.user_key.get_vnode_id()),
762 full_key.user_key.as_ref(),
763 local_stats,
764 ) {
765 continue;
766 }
767
768 local_stats.overlapping_get_count += 1;
769 if let Some(iter) = get_from_sstable_info(
770 self.sstable_store.clone(),
771 sstable_info,
772 full_key.to_ref(),
773 &read_options,
774 dist_key_hash,
775 local_stats,
776 )
777 .await?
778 {
779 debug_assert!(iter.is_valid());
780 let data_epoch = iter.key().epoch_with_gap;
781 return Ok(if data_epoch.pure_epoch() < min_epoch {
782 None
783 } else {
784 iter.value()
785 .into_user_value()
786 .map(|v| {
787 on_key_value_fn(
788 FullKey::new_with_gap_epoch(
789 table_id,
790 table_key.to_ref(),
791 data_epoch,
792 ),
793 v,
794 )
795 })
796 .transpose()?
797 });
798 }
799 }
800 }
801 LevelType::Nonoverlapping => {
802 let mut table_info_idx =
803 search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
804 if table_info_idx == 0 {
805 continue;
806 }
807 table_info_idx = table_info_idx.saturating_sub(1);
808 let sstable_info = &level.table_infos[table_info_idx];
809
810 if sstable_info.table_ids.binary_search(&table_id).is_err() {
811 continue;
812 }
813
814 let ord = sstable_info
816 .key_range
817 .compare_right_with_user_key(full_key.user_key.as_ref());
818 if ord == Ordering::Less {
820 sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
821 continue;
822 }
823
824 if Self::skip_get_by_vnode_user_key_range(
826 sstable_info,
827 VirtualNode::from_index(full_key.user_key.get_vnode_id()),
828 full_key.user_key.as_ref(),
829 local_stats,
830 ) {
831 continue;
832 }
833
834 local_stats.non_overlapping_get_count += 1;
835 if let Some(iter) = get_from_sstable_info(
836 self.sstable_store.clone(),
837 sstable_info,
838 full_key.to_ref(),
839 &read_options,
840 dist_key_hash,
841 local_stats,
842 )
843 .await?
844 {
845 debug_assert!(iter.is_valid());
846 let data_epoch = iter.key().epoch_with_gap;
847 return Ok(if data_epoch.pure_epoch() < min_epoch {
848 None
849 } else {
850 iter.value()
851 .into_user_value()
852 .map(|v| {
853 on_key_value_fn(
854 FullKey::new_with_gap_epoch(
855 table_id,
856 table_key.to_ref(),
857 data_epoch,
858 ),
859 v,
860 )
861 })
862 .transpose()?
863 });
864 }
865 }
866 }
867 }
868 stats_guard.local_stats.found_key = false;
869 Ok(None)
870 }
871
872 pub async fn iter(
873 &self,
874 table_key_range: TableKeyRange,
875 epoch: u64,
876 table_id: TableId,
877 table_option: TableOption,
878 read_options: ReadOptions,
879 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
880 ) -> StorageResult<HummockStorageIterator> {
881 self.iter_with_memtable(
882 table_key_range,
883 epoch,
884 table_id,
885 table_option,
886 read_options,
887 read_version_tuple,
888 None,
889 )
890 .await
891 }
892
893 pub async fn iter_with_memtable<'b>(
894 &self,
895 table_key_range: TableKeyRange,
896 epoch: u64,
897 table_id: TableId,
898 table_option: TableOption,
899 read_options: ReadOptions,
900 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
901 memtable_iter: Option<MemTableHummockIterator<'b>>,
902 ) -> StorageResult<HummockStorageIteratorInner<'b>> {
903 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
904 let user_key_range = (
905 user_key_range_ref.0.map(|key| key.cloned()),
906 user_key_range_ref.1.map(|key| key.cloned()),
907 );
908 let mut factory = ForwardIteratorFactory::default();
909 let mut local_stats = StoreLocalStatistic::default();
910 let (imms, uncommitted_ssts, committed) = read_version_tuple;
911 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
912 self.iter_inner(
913 table_key_range,
914 epoch,
915 table_id,
916 read_options,
917 imms,
918 uncommitted_ssts,
919 &committed,
920 &mut local_stats,
921 &mut factory,
922 )
923 .await?;
924 let merge_iter = factory.build(memtable_iter);
925 let mut user_iter = UserIterator::new(
927 merge_iter,
928 user_key_range,
929 epoch,
930 min_epoch,
931 Some(committed),
932 );
933 user_iter.rewind().await?;
934 Ok(HummockStorageIteratorInner::new(
935 user_iter,
936 self.state_store_metrics.clone(),
937 table_id,
938 local_stats,
939 ))
940 }
941
942 pub async fn rev_iter<'b>(
943 &self,
944 table_key_range: TableKeyRange,
945 epoch: u64,
946 table_id: TableId,
947 table_option: TableOption,
948 read_options: ReadOptions,
949 read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
950 memtable_iter: Option<MemTableHummockRevIterator<'b>>,
951 ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
952 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
953 let user_key_range = (
954 user_key_range_ref.0.map(|key| key.cloned()),
955 user_key_range_ref.1.map(|key| key.cloned()),
956 );
957 let mut factory = BackwardIteratorFactory::default();
958 let mut local_stats = StoreLocalStatistic::default();
959 let (imms, uncommitted_ssts, committed) = read_version_tuple;
960 let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
961 self.iter_inner(
962 table_key_range,
963 epoch,
964 table_id,
965 read_options,
966 imms,
967 uncommitted_ssts,
968 &committed,
969 &mut local_stats,
970 &mut factory,
971 )
972 .await?;
973 let merge_iter = factory.build(memtable_iter);
974 let mut user_iter = BackwardUserIterator::new(
976 merge_iter,
977 user_key_range,
978 epoch,
979 min_epoch,
980 Some(committed),
981 );
982 user_iter.rewind().await?;
983 Ok(HummockStorageRevIteratorInner::new(
984 user_iter,
985 self.state_store_metrics.clone(),
986 table_id,
987 local_stats,
988 ))
989 }
990
991 async fn iter_inner<F: IteratorFactory>(
992 &self,
993 table_key_range: TableKeyRange,
994 epoch: u64,
995 table_id: TableId,
996 read_options: ReadOptions,
997 imms: Vec<ImmutableMemtable>,
998 uncommitted_ssts: Vec<SstableInfo>,
999 committed: &CommittedVersion,
1000 local_stats: &mut StoreLocalStatistic,
1001 factory: &mut F,
1002 ) -> StorageResult<()> {
1003 {
1004 fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
1005 match bound {
1006 Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
1007 Bound::Unbounded => None,
1008 }
1009 }
1010 let (left, right) = &table_key_range;
1011 if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
1012 && right < left
1013 {
1014 if cfg!(debug_assertions) {
1015 panic!("invalid iter key range: {table_id} {left:?} {right:?}")
1016 } else {
1017 return Err(HummockError::other(format!(
1018 "invalid iter key range: {table_id} {left:?} {right:?}"
1019 ))
1020 .into());
1021 }
1022 }
1023 }
1024
1025 local_stats.staging_imm_iter_count = imms.len() as u64;
1026 for imm in imms {
1027 factory.add_batch_iter(imm);
1028 }
1029
1030 let user_key_range = bound_table_key_range(table_id, &table_key_range);
1034 let user_key_range_ref = (
1035 user_key_range.0.as_ref().map(UserKey::as_ref),
1036 user_key_range.1.as_ref().map(UserKey::as_ref),
1037 );
1038 let mut staging_sst_iter_count = 0;
1039 let filter_prefix_hash = read_options
1041 .prefix_hint
1042 .as_ref()
1043 .map(|hint| Sstable::hash_for_filter(hint, table_id.as_raw_id()));
1044 let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
1045 if read_options.prefetch_options.prefetch {
1046 sst_read_options.must_iterated_end_user_key =
1047 Some(user_key_range.1.map(|key| key.cloned()));
1048 sst_read_options.max_preload_retry_times = self.preload_retry_times;
1049 }
1050 let sst_read_options = Arc::new(sst_read_options);
1051 for sstable_info in &uncommitted_ssts {
1052 let table_holder = self
1053 .sstable_store
1054 .sstable(sstable_info, local_stats)
1055 .await?;
1056
1057 if let Some(prefix_hash) = filter_prefix_hash.as_ref()
1058 && !hit_sstable_filter(
1059 &table_holder,
1060 &user_key_range_ref,
1061 *prefix_hash,
1062 local_stats,
1063 )
1064 {
1065 continue;
1066 }
1067
1068 staging_sst_iter_count += 1;
1069 factory.add_staging_sst_iter(F::SstableIteratorType::create(
1070 table_holder,
1071 self.sstable_store.clone(),
1072 sst_read_options.clone(),
1073 sstable_info,
1074 ));
1075 }
1076 local_stats.staging_sst_iter_count = staging_sst_iter_count;
1077
1078 let timer = Instant::now();
1079
1080 for level in committed.levels(table_id) {
1081 if level.table_infos.is_empty() {
1082 continue;
1083 }
1084
1085 if level.level_type == LevelType::Nonoverlapping {
1086 let mut table_infos =
1087 prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1088 .peekable();
1089
1090 if table_infos.peek().is_none() {
1091 continue;
1092 }
1093 let sstable_infos = table_infos.cloned().collect_vec();
1094 if sstable_infos.len() > 1 {
1095 factory.add_concat_sst_iter(
1096 sstable_infos,
1097 self.sstable_store.clone(),
1098 sst_read_options.clone(),
1099 );
1100 local_stats.non_overlapping_iter_count += 1;
1101 } else {
1102 let sstable_info = &sstable_infos[0];
1103
1104 let sstable = self
1105 .sstable_store
1106 .sstable(sstable_info, local_stats)
1107 .await?;
1108
1109 if let Some(dist_hash) = filter_prefix_hash.as_ref()
1110 && !hit_sstable_filter(
1111 &sstable,
1112 &user_key_range_ref,
1113 *dist_hash,
1114 local_stats,
1115 )
1116 {
1117 continue;
1118 }
1119 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1125 sstable,
1126 self.sstable_store.clone(),
1127 sst_read_options.clone(),
1128 sstable_info,
1129 ));
1130 local_stats.non_overlapping_iter_count += 1;
1131 }
1132 } else {
1133 let table_infos =
1134 prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1135 let fetch_meta_req = table_infos.rev().collect_vec();
1137 if fetch_meta_req.is_empty() {
1138 continue;
1139 }
1140 for sstable_info in fetch_meta_req {
1141 let sstable = self
1142 .sstable_store
1143 .sstable(sstable_info, local_stats)
1144 .await?;
1145 assert_eq!(sstable_info.object_id, sstable.id);
1146 if let Some(dist_hash) = filter_prefix_hash.as_ref()
1147 && !hit_sstable_filter(
1148 &sstable,
1149 &user_key_range_ref,
1150 *dist_hash,
1151 local_stats,
1152 )
1153 {
1154 continue;
1155 }
1156 factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1157 sstable,
1158 self.sstable_store.clone(),
1159 sst_read_options.clone(),
1160 sstable_info,
1161 ));
1162 local_stats.overlapping_iter_count += 1;
1163 }
1164 }
1165 }
1166 let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1167 if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1168 let table_id_string = table_id.to_string();
1169 tracing::warn!(
1170 "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1171 table_id_string,
1172 epoch,
1173 fetch_meta_duration_sec,
1174 local_stats.cache_meta_block_miss
1175 );
1176 self.state_store_metrics
1177 .iter_slow_fetch_meta_cache_unhits
1178 .set(local_stats.cache_meta_block_miss as i64);
1179 }
1180 Ok(())
1181 }
1182
1183 pub async fn iter_log(
1184 &self,
1185 epoch_range: (u64, u64),
1186 key_range: TableKeyRange,
1187 options: ReadLogOptions,
1188 table_change_log_manager: Arc<TableChangeLogManager>,
1189 ) -> HummockResult<ChangeLogIterator> {
1190 let change_log: Vec<_> = {
1192 let table_change_logs = table_change_log_manager
1193 .fetch_table_change_logs(options.table_id, epoch_range, false, None)
1194 .await?;
1195 if let Some(change_log) = table_change_logs.get(&options.table_id) {
1196 change_log.filter_epoch(epoch_range).cloned().collect_vec()
1197 } else {
1198 Vec::new()
1199 }
1200 };
1201
1202 if let Some(max_epoch_change_log) = change_log.last() {
1203 let (_, max_epoch) = epoch_range;
1204 if !max_epoch_change_log.epochs().contains(&max_epoch) {
1205 warn!(
1206 max_epoch,
1207 change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1208 table_id = %options.table_id,
1209 "max_epoch does not exist"
1210 );
1211 }
1212 }
1213 let read_options = Arc::new(SstableIteratorReadOptions {
1214 cache_policy: Default::default(),
1215 must_iterated_end_user_key: None,
1216 max_preload_retry_times: 0,
1217 prefetch_for_large_query: false,
1218 });
1219
1220 async fn make_iter(
1221 sstable_infos: impl Iterator<Item = &SstableInfo>,
1222 sstable_store: &SstableStoreRef,
1223 read_options: Arc<SstableIteratorReadOptions>,
1224 local_stat: &mut StoreLocalStatistic,
1225 ) -> HummockResult<MergeIterator<SstableIterator>> {
1226 let iters = try_join_all(sstable_infos.map(|sstable_info| {
1227 let sstable_store = sstable_store.clone();
1228 let read_options = read_options.clone();
1229 async move {
1230 let mut local_stat = StoreLocalStatistic::default();
1231 let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1232 Ok::<_, HummockError>((
1233 SstableIterator::new(
1234 table_holder,
1235 sstable_store,
1236 read_options,
1237 sstable_info,
1238 ),
1239 local_stat,
1240 ))
1241 }
1242 }))
1243 .await?;
1244 Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1245 |(iter, stats)| {
1246 local_stat.add(&stats);
1247 iter
1248 },
1249 )))
1250 }
1251
1252 let mut local_stat = StoreLocalStatistic::default();
1253
1254 let new_value_iter = make_iter(
1255 change_log
1256 .iter()
1257 .flat_map(|log| log.new_value.iter())
1258 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1259 &self.sstable_store,
1260 read_options.clone(),
1261 &mut local_stat,
1262 )
1263 .await?;
1264 let old_value_iter = make_iter(
1265 change_log
1266 .iter()
1267 .flat_map(|log| log.old_value.iter())
1268 .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1269 &self.sstable_store,
1270 read_options.clone(),
1271 &mut local_stat,
1272 )
1273 .await?;
1274 ChangeLogIterator::new(
1275 epoch_range,
1276 key_range,
1277 new_value_iter,
1278 old_value_iter,
1279 options.table_id,
1280 IterLocalMetricsGuard::new(
1281 self.state_store_metrics.clone(),
1282 options.table_id,
1283 local_stat,
1284 ),
1285 )
1286 .await
1287 }
1288
1289 pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1290 &'a self,
1291 version: PinnedVersion,
1292 table_id: TableId,
1293 target: VectorRef<'a>,
1294 options: VectorNearestOptions,
1295 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1296 ) -> HummockResult<Vec<O>> {
1297 let Some(index) = version.vector_indexes.get(&table_id) else {
1298 return Ok(vec![]);
1299 };
1300 if target.dimension() != index.dimension {
1301 return Err(HummockError::other(format!(
1302 "target dimension {} not match index dimension {}",
1303 target.dimension(),
1304 index.dimension
1305 )));
1306 }
1307 match &index.inner {
1308 VectorIndexImpl::Flat(flat) => {
1309 let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1310 let mut cache_stat = VectorStoreCacheStats::default();
1311 for vector_file in &flat.vector_store_info.vector_files {
1312 let meta = self
1313 .sstable_store
1314 .get_vector_file_meta(vector_file, &mut cache_stat)
1315 .await?;
1316 for (i, block_meta) in meta.block_metas.iter().enumerate() {
1317 let block = self
1318 .sstable_store
1319 .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1320 .await?;
1321 builder.add(&**block, &on_nearest_item_fn);
1322 }
1323 }
1324 cache_stat.report(table_id, "flat", self.stats());
1325 Ok(builder.finish())
1326 }
1327 VectorIndexImpl::HnswFlat(hnsw_flat) => {
1328 let Some(graph_file) = &hnsw_flat.graph_file else {
1329 return Ok(vec![]);
1330 };
1331
1332 let mut ctx = FileVectorStoreCtx::default();
1333
1334 let graph = self
1335 .sstable_store
1336 .get_hnsw_graph(graph_file, &mut ctx.stats)
1337 .await?;
1338
1339 let vector_store =
1340 FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1341 let (items, stats) = nearest::<O, M, _>(
1342 &vector_store,
1343 &mut ctx,
1344 &*graph,
1345 target,
1346 on_nearest_item_fn,
1347 options.hnsw_ef_search,
1348 options.top_n,
1349 )
1350 .await?;
1351 ctx.stats.report(table_id, "hnsw_read", self.stats());
1352 report_hnsw_stat(
1353 self.stats(),
1354 table_id,
1355 "hnsw_read",
1356 options.top_n,
1357 options.hnsw_ef_search,
1358 [stats],
1359 );
1360 Ok(items)
1361 }
1362 }
1363 }
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368 use std::collections::{BTreeMap, HashMap, HashSet};
1369 use std::sync::Arc;
1370
1371 use bytes::Bytes;
1372 use prometheus::Registry;
1373 use risingwave_common::catalog::{TableId, TableOption};
1374 use risingwave_common::config::MetricLevel;
1375 use risingwave_common::hash::VirtualNode;
1376 use risingwave_common::util::epoch::test_epoch;
1377 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
1378 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_bytes};
1379 use risingwave_hummock_sdk::key_range::KeyRange;
1380 use risingwave_hummock_sdk::level::{Level, Levels, OverlappingLevel};
1381 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
1382 use risingwave_hummock_sdk::version::HummockVersion;
1383 use risingwave_hummock_sdk::{EpochWithGap, HummockSstableObjectId};
1384 use risingwave_pb::hummock::hummock_version::PbLevels;
1385 use risingwave_pb::hummock::{
1386 LevelType as PbLevelType, PbBloomFilterType, PbHummockVersion, PbLevel, PbOverlappingLevel,
1387 PbStateTableInfo, StateTableInfoDelta,
1388 };
1389 use tokio::sync::mpsc::unbounded_channel;
1390
1391 use crate::hummock::HummockValue;
1392 use crate::hummock::iterator::test_utils::mock_sstable_store;
1393 use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
1394 use crate::hummock::store::version::{CommittedVersion, HummockVersionReader};
1395 use crate::hummock::test_utils::{
1396 default_builder_opt_for_test, gen_test_sstable_with_table_ids,
1397 };
1398 use crate::monitor::{HummockStateStoreMetrics, flush_local_metrics_for_test};
1399 use crate::store::ReadOptions;
1400
1401 #[tokio::test]
1405 async fn test_get_skips_sst_by_table_id_filter() {
1406 let query_table_id = TableId::new(100);
1407 let epoch: u64 = (31 * 1000) << 16;
1408 let compaction_group_id = StaticCompactionGroupId::StateDefault;
1409
1410 let sst_info = SstableInfoInner {
1412 sst_id: 1.into(),
1413 object_id: 1.into(),
1414 key_range: KeyRange {
1415 left: Bytes::from(
1416 FullKey::for_test(TableId::new(50), b"aaa".to_vec(), epoch).encode(),
1417 ),
1418 right: Bytes::from(
1419 FullKey::for_test(TableId::new(150), b"zzz".to_vec(), epoch).encode(),
1420 ),
1421 right_exclusive: false,
1422 },
1423 table_ids: vec![TableId::new(50), TableId::new(150)],
1424 file_size: 1024,
1425 ..Default::default()
1426 }
1427 .into();
1428
1429 let level = Level {
1430 level_idx: 1,
1431 level_type: PbLevelType::Nonoverlapping,
1432 table_infos: vec![sst_info],
1433 total_file_size: 0,
1434 sub_level_id: 0,
1435 uncompressed_file_size: 0,
1436 vnode_partition_count: 0,
1437 };
1438
1439 #[allow(deprecated)]
1440 let levels = Levels {
1441 levels: vec![level],
1442 l0: OverlappingLevel::default(),
1443 group_id: compaction_group_id,
1444 parent_group_id: compaction_group_id,
1445 member_table_ids: vec![],
1446 compaction_group_version_id: 0,
1447 };
1448
1449 let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
1450 id: 1u64.into(),
1451 ..Default::default()
1452 });
1453 version.levels.insert(compaction_group_id, levels);
1454 version.state_table_info.apply_delta(
1455 &HashMap::from([(
1456 query_table_id,
1457 StateTableInfoDelta {
1458 committed_epoch: epoch,
1459 compaction_group_id,
1460 },
1461 )]),
1462 &HashSet::new(),
1463 );
1464
1465 let pinned_version = PinnedVersion::new(version, unbounded_channel().0);
1466 let reader = HummockVersionReader::new(
1467 mock_sstable_store().await,
1468 Arc::new(HummockStateStoreMetrics::unused()),
1469 0,
1470 );
1471
1472 let result = reader
1473 .get(
1474 TableKey(Bytes::from("test_key")),
1475 epoch,
1476 query_table_id,
1477 TableOption::default(),
1478 ReadOptions::default(),
1479 (vec![], vec![], pinned_version),
1480 |_key, _value| Ok(()),
1481 )
1482 .await
1483 .unwrap();
1484
1485 assert!(result.is_none());
1486 }
1487
1488 #[allow(deprecated)]
1490 fn build_version_with_vnode_stats(
1491 table_id: TableId,
1492 vnode_stats: VnodeStatistics,
1493 key_range: (Vec<u8>, Vec<u8>),
1494 level_type: PbLevelType,
1495 ) -> (SstableInfo, CommittedVersion) {
1496 let object_id = HummockSstableObjectId::new(1);
1497 let left_full_key = FullKey::new_with_gap_epoch(
1498 table_id,
1499 TableKey(Bytes::from(key_range.0)),
1500 EpochWithGap::new_from_epoch(test_epoch(0)),
1501 )
1502 .encode();
1503 let right_full_key = FullKey::new_with_gap_epoch(
1504 table_id,
1505 TableKey(Bytes::from(key_range.1)),
1506 EpochWithGap::new_from_epoch(test_epoch(0)),
1507 )
1508 .encode();
1509
1510 let sstable_info: SstableInfo = SstableInfoInner {
1511 object_id,
1512 sst_id: object_id.as_raw_id().into(),
1513 key_range: KeyRange {
1514 left: Bytes::from(left_full_key),
1515 right: Bytes::from(right_full_key),
1516 right_exclusive: false,
1517 },
1518 file_size: 1,
1519 table_ids: vec![table_id],
1520 meta_offset: 0,
1521 stale_key_count: 0,
1522 total_key_count: 0,
1523 min_epoch: 0,
1524 max_epoch: 0,
1525 uncompressed_file_size: 0,
1526 range_tombstone_count: 0,
1527 bloom_filter_kind: PbBloomFilterType::Sstable,
1528 filter_type: risingwave_pb::hummock::PbSstableFilterType::SstableFilterXor16,
1529 sst_size: 1,
1530 vnode_statistics: Some(vnode_stats),
1531 }
1532 .into();
1533 let pb_level = PbLevel {
1534 level_idx: if level_type == PbLevelType::Overlapping {
1535 0
1536 } else {
1537 1
1538 },
1539 level_type: level_type as i32,
1540 table_infos: vec![sstable_info.clone().into()],
1541 total_file_size: 1,
1542 sub_level_id: 0,
1543 uncompressed_file_size: 1,
1544 vnode_partition_count: 0,
1545 };
1546
1547 let (levels, l0) = if level_type == PbLevelType::Overlapping {
1548 (
1549 vec![],
1550 Some(PbOverlappingLevel {
1551 sub_levels: vec![pb_level],
1552 total_file_size: 1,
1553 uncompressed_file_size: 1,
1554 }),
1555 )
1556 } else {
1557 (vec![pb_level], Some(PbOverlappingLevel::default()))
1558 };
1559
1560 let pb_levels = PbLevels {
1561 levels,
1562 l0,
1563 group_id: StaticCompactionGroupId::NewCompactionGroup,
1564 parent_group_id: 0.into(),
1565 member_table_ids: vec![],
1566 compaction_group_version_id: 0,
1567 };
1568
1569 let pb_version = PbHummockVersion {
1570 id: 1.into(),
1571 levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1572 max_committed_epoch: 0,
1573 table_watermarks: HashMap::new(),
1574 table_change_logs: HashMap::new(),
1575 state_table_info: HashMap::from_iter([(
1576 table_id,
1577 PbStateTableInfo {
1578 committed_epoch: 0,
1579 compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1580 },
1581 )]),
1582 vector_indexes: HashMap::new(),
1583 };
1584
1585 let version = HummockVersion::from(&pb_version);
1586 let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1587 let pinned = PinnedVersion::new(version, tx);
1588 (sstable_info, pinned)
1589 }
1590
1591 #[allow(deprecated)]
1593 fn build_version_from_sstables(
1594 table_id: TableId,
1595 sstable_infos: Vec<SstableInfo>,
1596 level_type: PbLevelType,
1597 ) -> CommittedVersion {
1598 let total_file_size = sstable_infos.iter().map(|sst| sst.file_size).sum::<u64>();
1599 let uncompressed_file_size = sstable_infos
1600 .iter()
1601 .map(|sst| sst.uncompressed_file_size)
1602 .sum::<u64>();
1603 let pb_level = PbLevel {
1604 level_idx: if level_type == PbLevelType::Overlapping {
1605 0
1606 } else {
1607 1
1608 },
1609 level_type: level_type as i32,
1610 table_infos: sstable_infos.into_iter().map(Into::into).collect(),
1611 total_file_size,
1612 sub_level_id: 0,
1613 uncompressed_file_size,
1614 vnode_partition_count: 0,
1615 };
1616
1617 let (levels, l0) = if level_type == PbLevelType::Overlapping {
1618 (
1619 vec![],
1620 Some(PbOverlappingLevel {
1621 sub_levels: vec![pb_level],
1622 total_file_size,
1623 uncompressed_file_size,
1624 }),
1625 )
1626 } else {
1627 (vec![pb_level], Some(PbOverlappingLevel::default()))
1628 };
1629
1630 let pb_levels = PbLevels {
1631 levels,
1632 l0,
1633 group_id: StaticCompactionGroupId::NewCompactionGroup,
1634 parent_group_id: 0.into(),
1635 member_table_ids: vec![],
1636 compaction_group_version_id: 0,
1637 };
1638
1639 let pb_version = PbHummockVersion {
1640 id: 1.into(),
1641 levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1642 max_committed_epoch: 0,
1643 table_watermarks: HashMap::new(),
1644 table_change_logs: HashMap::new(),
1645 state_table_info: HashMap::from_iter([(
1646 table_id,
1647 PbStateTableInfo {
1648 committed_epoch: 0,
1649 compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1650 },
1651 )]),
1652 vector_indexes: HashMap::new(),
1653 };
1654
1655 let version = HummockVersion::from(&pb_version);
1656 let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1657 PinnedVersion::new(version, tx)
1658 }
1659
1660 #[allow(deprecated)]
1662 fn build_version_from_sstable(
1663 table_id: TableId,
1664 sstable_info: SstableInfo,
1665 ) -> CommittedVersion {
1666 build_version_from_sstables(table_id, vec![sstable_info], PbLevelType::Nonoverlapping)
1667 }
1668
1669 fn vnode_prune_counts(
1670 metrics: &HummockStateStoreMetrics,
1671 table_id: TableId,
1672 operation: &str,
1673 ) -> (u64, u64) {
1674 let table_label = table_id.to_string();
1675 let checked = metrics
1676 .vnode_pruning_counts
1677 .with_guarded_label_values(&[
1678 table_label.clone(),
1679 operation.to_owned(),
1680 "checked".to_owned(),
1681 ])
1682 .get();
1683 let pruned = metrics
1684 .vnode_pruning_counts
1685 .with_guarded_label_values(&[table_label, operation.to_owned(), "pruned".to_owned()])
1686 .get();
1687 (checked, pruned)
1688 }
1689
1690 async fn assert_vnode_prune_get_skips_out_of_range_key(
1691 table_id: TableId,
1692 epoch: u64,
1693 level_type: PbLevelType,
1694 ) {
1695 let sstable_store = mock_sstable_store().await;
1696 let registry = Registry::new();
1697 let metrics = Arc::new(HummockStateStoreMetrics::new(®istry, MetricLevel::Debug));
1698 let reader = HummockVersionReader::new(sstable_store, metrics.clone(), 0);
1699 let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1700
1701 let make_user_key = |vnode: VirtualNode, suffix: &str| {
1702 let mut raw = vnode.to_be_bytes().to_vec();
1703 raw.extend_from_slice(suffix.as_bytes());
1704 UserKey::new(table_id, TableKey(raw.into()))
1705 };
1706
1707 let vnode_stats = VnodeStatistics::from_map(BTreeMap::from_iter([(
1709 VirtualNode::from_index(1),
1710 (
1711 make_user_key(VirtualNode::from_index(1), "aa"),
1712 make_user_key(VirtualNode::from_index(1), "bb"),
1713 ),
1714 )]));
1715
1716 let key_range = {
1718 let mut left = VirtualNode::from_index(0).to_be_bytes().to_vec();
1719 left.extend_from_slice(b"aa");
1720 let mut right = VirtualNode::from_index(1).to_be_bytes().to_vec();
1721 right.extend_from_slice(b"zzzz");
1722 (left, right)
1723 };
1724
1725 let (_sst, committed) =
1726 build_version_with_vnode_stats(table_id, vnode_stats, key_range, level_type);
1727
1728 let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1730 raw.extend_from_slice(b"zz");
1731 let table_key = TableKey(Bytes::from(raw.clone()));
1732
1733 let result = reader
1734 .get(
1735 table_key,
1736 epoch,
1737 table_id,
1738 TableOption::default(),
1739 ReadOptions::default(),
1740 (vec![], vec![], committed),
1741 |_k, v| Ok(Bytes::copy_from_slice(v)),
1742 )
1743 .await
1744 .unwrap();
1745 flush_local_metrics_for_test();
1746
1747 assert!(
1748 result.is_none(),
1749 "vnode pruning should skip SST without reading data"
1750 );
1751 let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1752 assert_eq!(checked_before + 1, checked_after);
1753 assert_eq!(pruned_before + 1, pruned_after);
1754 }
1755
1756 async fn assert_vnode_prune_get_not_pruned_nonoverlapping() {
1757 let table_id = TableId::new(42);
1758 let epoch = test_epoch(3);
1759 let sstable_store = mock_sstable_store().await;
1760 let registry = Registry::new();
1761 let metrics = Arc::new(HummockStateStoreMetrics::new(®istry, MetricLevel::Debug));
1762 let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1763 let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1764
1765 let mut opts = default_builder_opt_for_test();
1766 opts.max_vnode_key_range_bytes = None;
1767 let mut kvs = vec![
1768 (
1769 FullKey::new_with_gap_epoch(
1770 table_id,
1771 gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1772 EpochWithGap::new_from_epoch(epoch),
1773 ),
1774 HummockValue::put(Bytes::from_static(b"v1")),
1775 ),
1776 (
1777 FullKey::new_with_gap_epoch(
1778 table_id,
1779 gen_key_from_bytes(VirtualNode::ZERO, b"cc"),
1780 EpochWithGap::new_from_epoch(epoch),
1781 ),
1782 HummockValue::put(Bytes::from_static(b"v0")),
1783 ),
1784 ];
1785 kvs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1786 let (_, mut sstable_info): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1787 gen_test_sstable_with_table_ids(
1788 opts,
1789 10,
1790 kvs.into_iter(),
1791 sstable_store.clone(),
1792 vec![table_id.as_raw_id()],
1793 )
1794 .await;
1795 let mut inner = sstable_info.get_inner();
1797 inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1798 VirtualNode::from_index(1),
1799 (
1800 UserKey::new(
1801 table_id,
1802 gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1803 ),
1804 UserKey::new(
1805 table_id,
1806 gen_key_from_bytes(VirtualNode::from_index(1), b"zz"),
1807 ),
1808 ),
1809 )])));
1810 sstable_info = inner.into();
1811 let committed = build_version_from_sstable(table_id, sstable_info.clone());
1812
1813 let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1815 raw.extend_from_slice(b"aa");
1816 let table_key = TableKey(Bytes::from(raw.clone()));
1817
1818 let result = reader
1819 .get(
1820 table_key,
1821 epoch,
1822 table_id,
1823 TableOption::default(),
1824 ReadOptions::default(),
1825 (vec![], vec![], committed),
1826 |_k, v| Ok(Bytes::copy_from_slice(v)),
1827 )
1828 .await
1829 .unwrap();
1830 flush_local_metrics_for_test();
1831 assert!(result.is_some(), "key should be read when not pruned");
1832 let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1833 assert_eq!(checked_before + 1, checked_after);
1834 assert_eq!(pruned_before, pruned_after);
1835 }
1836
1837 #[tokio::test]
1838 async fn test_vnode_prune_get_single_sst_cases() {
1839 assert_vnode_prune_get_skips_out_of_range_key(
1840 TableId::default(),
1841 test_epoch(1),
1842 PbLevelType::Nonoverlapping,
1843 )
1844 .await;
1845 assert_vnode_prune_get_skips_out_of_range_key(
1846 TableId::new(7),
1847 test_epoch(2),
1848 PbLevelType::Overlapping,
1849 )
1850 .await;
1851 assert_vnode_prune_get_not_pruned_nonoverlapping().await;
1852 }
1853
1854 #[tokio::test]
1855 async fn test_vnode_prune_get_overlapping_distribution_prunes_only_out_of_range_sst() {
1856 let table_id = TableId::new(77);
1857 let epoch = test_epoch(4);
1858 let vnode = VirtualNode::from_index(1);
1859 let sstable_store = mock_sstable_store().await;
1860 let registry = Registry::new();
1861 let metrics = Arc::new(HummockStateStoreMetrics::new(®istry, MetricLevel::Debug));
1862 let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1863 let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1864
1865 let mut opts = default_builder_opt_for_test();
1866 opts.max_vnode_key_range_bytes = None;
1867
1868 let mut kvs1 = vec![
1869 (
1870 FullKey::new_with_gap_epoch(
1871 table_id,
1872 gen_key_from_bytes(vnode, b"aa"),
1873 EpochWithGap::new_from_epoch(epoch),
1874 ),
1875 HummockValue::put(Bytes::from_static(b"s1_aa")),
1876 ),
1877 (
1878 FullKey::new_with_gap_epoch(
1879 table_id,
1880 gen_key_from_bytes(vnode, b"zz"),
1881 EpochWithGap::new_from_epoch(epoch),
1882 ),
1883 HummockValue::put(Bytes::from_static(b"s1_zz")),
1884 ),
1885 ];
1886 kvs1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1887 let (_, mut sst1): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1888 gen_test_sstable_with_table_ids(
1889 opts.clone(),
1890 11,
1891 kvs1.into_iter(),
1892 sstable_store.clone(),
1893 vec![table_id.as_raw_id()],
1894 )
1895 .await;
1896 let mut sst1_inner = sst1.get_inner();
1897 sst1_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1898 vnode,
1899 (
1900 UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1901 UserKey::new(table_id, gen_key_from_bytes(vnode, b"bb")),
1902 ),
1903 )])));
1904 sst1 = sst1_inner.into();
1905
1906 let mut kvs2 = vec![(
1907 FullKey::new_with_gap_epoch(
1908 table_id,
1909 gen_key_from_bytes(vnode, b"mm"),
1910 EpochWithGap::new_from_epoch(epoch),
1911 ),
1912 HummockValue::put(Bytes::from_static(b"hit")),
1913 )];
1914 kvs2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1915 let (_, mut sst2): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1916 gen_test_sstable_with_table_ids(
1917 opts,
1918 12,
1919 kvs2.into_iter(),
1920 sstable_store.clone(),
1921 vec![table_id.as_raw_id()],
1922 )
1923 .await;
1924 let mut sst2_inner = sst2.get_inner();
1925 sst2_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1926 vnode,
1927 (
1928 UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1929 UserKey::new(table_id, gen_key_from_bytes(vnode, b"zz")),
1930 ),
1931 )])));
1932 sst2 = sst2_inner.into();
1933
1934 let committed =
1935 build_version_from_sstables(table_id, vec![sst1, sst2], PbLevelType::Overlapping);
1936
1937 let table_key = gen_key_from_bytes(vnode, b"mm");
1938 let result = reader
1939 .get(
1940 table_key,
1941 epoch,
1942 table_id,
1943 TableOption::default(),
1944 ReadOptions::default(),
1945 (vec![], vec![], committed),
1946 |_k, v| Ok(Bytes::copy_from_slice(v)),
1947 )
1948 .await
1949 .unwrap();
1950 flush_local_metrics_for_test();
1951
1952 assert_eq!(result, Some(Bytes::from_static(b"hit")));
1953 let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1954 assert_eq!(checked_before + 2, checked_after);
1956 assert_eq!(pruned_before + 1, pruned_after);
1957 }
1958}