risingwave_storage/hummock/store/
version.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::{self};
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::array::VectorRef;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{TableId, TableOption};
29use risingwave_common::hash::VirtualNode;
30use risingwave_common::util::epoch::MAX_SPILL_TIMES;
31use risingwave_hummock_sdk::key::{
32    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
33};
34use risingwave_hummock_sdk::key_range::KeyRangeCommon;
35use risingwave_hummock_sdk::sstable_info::SstableInfo;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
40use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
41use risingwave_pb::hummock::LevelType;
42use sync_point::sync_point;
43use tracing::warn;
44
45use crate::error::StorageResult;
46use crate::hummock::event_handler::LocalInstanceId;
47use crate::hummock::iterator::change_log::ChangeLogIterator;
48use crate::hummock::iterator::{
49    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
50};
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
53use crate::hummock::sstable_store::SstableStoreRef;
54use crate::hummock::table_change_log_manager::TableChangeLogManager;
55use crate::hummock::utils::{
56    MemoryTracker, filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts,
57    range_overlap, search_sst_idx,
58};
59use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
60use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
61use crate::hummock::{
62    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
63    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
64    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
65    hit_sstable_filter,
66};
67use crate::mem_table::{
68    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
69};
70use crate::monitor::{
71    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
72};
73use crate::store::{
74    OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
75};
76use crate::vector::hnsw::nearest;
77use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
78
79pub type CommittedVersion = PinnedVersion;
80
81/// Data not committed to Hummock. There are two types of staging data:
82/// - Immutable memtable: data that has been written into local state store but not persisted.
83/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
84///   hummock version.
85
86#[derive(Clone, Debug, PartialEq)]
87pub struct StagingSstableInfo {
88    // newer data comes first
89    sstable_infos: Vec<LocalSstableInfo>,
90    old_value_sstable_infos: Vec<LocalSstableInfo>,
91    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
92    /// The field must not be empty.
93    epochs: Vec<HummockEpoch>,
94    // newer data at the front
95    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96    imm_size: usize,
97}
98
99impl StagingSstableInfo {
100    pub fn new(
101        sstable_infos: Vec<LocalSstableInfo>,
102        old_value_sstable_infos: Vec<LocalSstableInfo>,
103        epochs: Vec<HummockEpoch>,
104        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
105        imm_size: usize,
106    ) -> Self {
107        // the epochs are sorted from higher epoch to lower epoch
108        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
109        Self {
110            sstable_infos,
111            old_value_sstable_infos,
112            epochs,
113            imm_ids,
114            imm_size,
115        }
116    }
117
118    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
119        &self.sstable_infos
120    }
121
122    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
123        &self.old_value_sstable_infos
124    }
125
126    pub fn imm_size(&self) -> usize {
127        self.imm_size
128    }
129
130    pub fn epochs(&self) -> &Vec<HummockEpoch> {
131        &self.epochs
132    }
133
134    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
135        &self.imm_ids
136    }
137}
138
139pub enum VersionUpdate {
140    Sst(Arc<StagingSstableInfo>),
141    CommittedSnapshot(CommittedVersion),
142    NewTableWatermark {
143        direction: WatermarkDirection,
144        epoch: HummockEpoch,
145        vnode_watermarks: Vec<VnodeWatermark>,
146        watermark_type: WatermarkSerdeType,
147    },
148}
149
150pub struct StagingVersion {
151    pending_imm_size: usize,
152    /// It contains the imms added but not sent to the uploader of hummock event handler.
153    /// It is non-empty only when `upload_on_flush` is false.
154    ///
155    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
156    ///
157    /// newer data comes last
158    pub pending_imms: Vec<(ImmutableMemtable, MemoryTracker)>,
159    /// It contains the imms already sent to uploader of hummock event handler.
160    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
161    /// imm of smaller batch id may be added later than one with greater batch id
162    ///
163    /// Newer data comes first.
164    pub uploading_imms: VecDeque<ImmutableMemtable>,
165
166    // newer data comes first
167    pub sst: VecDeque<Arc<StagingSstableInfo>>,
168}
169
170impl StagingVersion {
171    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
172    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
173    pub fn prune_overlap<'a>(
174        &'a self,
175        max_epoch_inclusive: HummockEpoch,
176        table_id: TableId,
177        table_key_range: &'a TableKeyRange,
178    ) -> (
179        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
180        impl Iterator<Item = &'a SstableInfo> + 'a,
181    ) {
182        let (left, right) = table_key_range;
183        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
184        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
185        let overlapped_imms = self
186            .pending_imms
187            .iter()
188            .map(|(imm, _)| imm)
189            .rev() // rev to let newer imm come first
190            .chain(self.uploading_imms.iter())
191            .filter(move |imm| {
192                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
193                imm.epoch() <= max_epoch_inclusive
194                    && imm.table_id == table_id
195                    && range_overlap(
196                        &(left, right),
197                        &imm.start_table_key(),
198                        Bound::Included(&imm.end_table_key()),
199                    )
200            });
201
202        // TODO: Remove duplicate sst based on sst id
203        let overlapped_ssts = self
204            .sst
205            .iter()
206            .filter(move |staging_sst| {
207                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
208                sst_max_epoch <= max_epoch_inclusive
209            })
210            .flat_map(move |staging_sst| {
211                // TODO: sstable info should be concat-able after each streaming table owns a read
212                // version. May use concat sstable iter instead in some cases.
213                staging_sst
214                    .sstable_infos
215                    .iter()
216                    .map(|sstable| &sstable.sst_info)
217                    .filter(move |sstable: &&SstableInfo| {
218                        filter_single_sst(sstable, table_id, table_key_range)
219                    })
220            });
221        (overlapped_imms, overlapped_ssts)
222    }
223
224    pub fn is_empty(&self) -> bool {
225        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
226    }
227}
228
229/// A container of information required for reading from hummock.
230pub struct HummockReadVersion {
231    table_id: TableId,
232    instance_id: LocalInstanceId,
233
234    is_initialized: bool,
235
236    /// Local version for staging data.
237    staging: StagingVersion,
238
239    /// Remote version for committed data.
240    committed: CommittedVersion,
241
242    /// Indicate if this is replicated. If it is, we should ignore it during
243    /// global state store read, to avoid duplicated results.
244    /// Otherwise for local state store, it is fine, see we will see the
245    /// `ReadVersion` just for that local state store.
246    is_replicated: bool,
247
248    table_watermarks: Option<TableWatermarksIndex>,
249
250    // Vnode bitmap corresponding to the read version
251    // It will be initialized after local state store init
252    vnodes: Arc<Bitmap>,
253}
254
255impl HummockReadVersion {
256    pub fn new_with_replication_option(
257        table_id: TableId,
258        instance_id: LocalInstanceId,
259        committed_version: CommittedVersion,
260        is_replicated: bool,
261        vnodes: Arc<Bitmap>,
262    ) -> Self {
263        // before build `HummockReadVersion`, we need to get the a initial version which obtained
264        // from meta. want this initialization after version is initialized (now with
265        // notification), so add a assert condition to guarantee correct initialization order
266        assert!(committed_version.is_valid());
267        Self {
268            table_id,
269            instance_id,
270            table_watermarks: {
271                match committed_version.table_watermarks.get(&table_id) {
272                    Some(table_watermarks) => Some(TableWatermarksIndex::new_committed(
273                        table_watermarks.clone(),
274                        committed_version
275                            .state_table_info
276                            .info()
277                            .get(&table_id)
278                            .expect("should exist")
279                            .committed_epoch,
280                        table_watermarks.watermark_type,
281                    )),
282                    None => None,
283                }
284            },
285            staging: StagingVersion {
286                pending_imm_size: 0,
287                pending_imms: Vec::default(),
288                uploading_imms: VecDeque::default(),
289                sst: VecDeque::default(),
290            },
291
292            committed: committed_version,
293
294            is_replicated,
295            vnodes,
296            is_initialized: false,
297        }
298    }
299
300    pub fn new(
301        table_id: TableId,
302        instance_id: LocalInstanceId,
303        committed_version: CommittedVersion,
304        vnodes: Arc<Bitmap>,
305    ) -> Self {
306        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
307    }
308
309    pub fn table_id(&self) -> TableId {
310        self.table_id
311    }
312
313    pub fn init(&mut self) {
314        assert!(!self.is_initialized);
315        self.is_initialized = true;
316    }
317
318    pub fn add_pending_imm(&mut self, imm: ImmutableMemtable, tracker: MemoryTracker) {
319        assert!(self.is_initialized);
320        assert!(!self.is_replicated);
321        if let Some(item) = self
322            .staging
323            .pending_imms
324            .last()
325            .map(|(imm, _)| imm)
326            .or_else(|| self.staging.uploading_imms.front())
327        {
328            // check batch_id order from newest to old
329            assert!(item.batch_id() < imm.batch_id());
330        }
331
332        self.staging.pending_imm_size += imm.size();
333        self.staging.pending_imms.push((imm, tracker));
334    }
335
336    pub fn add_replicated_imm(&mut self, imm: ImmutableMemtable) {
337        assert!(self.is_initialized);
338        assert!(self.is_replicated);
339        assert!(self.staging.pending_imms.is_empty());
340        if let Some(item) = self.staging.uploading_imms.front() {
341            // check batch_id order from newest to old
342            assert!(item.batch_id() < imm.batch_id());
343        }
344        self.staging.uploading_imms.push_front(imm);
345    }
346
347    pub fn pending_imm_size(&self) -> usize {
348        self.staging.pending_imm_size
349    }
350
351    pub fn start_upload_pending_imms(&mut self) -> Vec<(ImmutableMemtable, MemoryTracker)> {
352        assert!(self.is_initialized);
353        assert!(!self.is_replicated);
354        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
355        for (imm, _) in &pending_imms {
356            self.staging.uploading_imms.push_front(imm.clone());
357        }
358        self.staging.pending_imm_size = 0;
359        pending_imms
360    }
361
362    /// Updates the read version with `VersionUpdate`.
363    /// There will be three data types to be processed
364    /// `VersionUpdate::Staging`
365    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
366    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
367    ///       corresponding `staging_imms` according to the `batch_id`
368    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
369    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
370    /// `staging_sst` and `staging_imm` in memory according to epoch
371    pub fn update(&mut self, info: VersionUpdate) {
372        match info {
373            VersionUpdate::Sst(staging_sst_ref) => {
374                {
375                    assert!(!self.is_replicated);
376                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
377                        warn!(
378                            instance_id = self.instance_id,
379                            "no related imm in sst input"
380                        );
381                        return;
382                    };
383
384                    // old data comes first
385                    for imm_id in imms.iter().rev() {
386                        let check_err = match self.staging.uploading_imms.pop_back() {
387                            None => Some("empty".to_owned()),
388                            Some(prev_imm_id) => {
389                                if prev_imm_id.batch_id() == *imm_id {
390                                    None
391                                } else {
392                                    Some(format!(
393                                        "miss match id {} {}",
394                                        prev_imm_id.batch_id(),
395                                        *imm_id
396                                    ))
397                                }
398                            }
399                        };
400                        assert!(
401                            check_err.is_none(),
402                            "should be valid staging_sst.size {},
403                                    staging_sst.imm_ids {:?},
404                                    staging_sst.epochs {:?},
405                                    local_pending_imm_ids {:?},
406                                    local_uploading_imm_ids {:?},
407                                    instance_id {}
408                                    check_err {:?}",
409                            staging_sst_ref.imm_size,
410                            staging_sst_ref.imm_ids,
411                            staging_sst_ref.epochs,
412                            self.staging
413                                .pending_imms
414                                .iter()
415                                .map(|(imm, _)| imm.batch_id())
416                                .collect_vec(),
417                            self.staging
418                                .uploading_imms
419                                .iter()
420                                .map(|imm| imm.batch_id())
421                                .collect_vec(),
422                            self.instance_id,
423                            check_err
424                        );
425                    }
426
427                    self.staging.sst.push_front(staging_sst_ref);
428                }
429            }
430
431            VersionUpdate::CommittedSnapshot(committed_version) => {
432                if let Some(info) = committed_version
433                    .state_table_info
434                    .info()
435                    .get(&self.table_id)
436                {
437                    let committed_epoch = info.committed_epoch;
438                    if self.is_replicated {
439                        self.staging
440                            .uploading_imms
441                            .retain(|imm| imm.epoch() > committed_epoch);
442                        self.staging
443                            .pending_imms
444                            .retain(|(imm, _)| imm.epoch() > committed_epoch);
445                    } else {
446                        self.staging
447                            .pending_imms
448                            .iter()
449                            .map(|(imm, _)| imm)
450                            .chain(self.staging.uploading_imms.iter())
451                            .for_each(|imm| {
452                                assert!(
453                                    imm.epoch() > committed_epoch,
454                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
455                                    imm.table_id,
456                                    imm.epoch(),
457                                    committed_epoch
458                                )
459                            });
460                    }
461
462                    self.staging.sst.retain(|sst| {
463                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
464                    });
465
466                    // check epochs.last() > MCE
467                    assert!(self.staging.sst.iter().all(|sst| {
468                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
469                    }));
470
471                    if let Some(committed_watermarks) =
472                        committed_version.table_watermarks.get(&self.table_id)
473                    {
474                        if let Some(watermark_index) = &mut self.table_watermarks {
475                            watermark_index.apply_committed_watermarks(
476                                committed_watermarks.clone(),
477                                committed_epoch,
478                            );
479                        } else {
480                            self.table_watermarks = Some(TableWatermarksIndex::new_committed(
481                                committed_watermarks.clone(),
482                                committed_epoch,
483                                committed_watermarks.watermark_type,
484                            ));
485                        }
486                    }
487                }
488
489                self.committed = committed_version;
490            }
491            VersionUpdate::NewTableWatermark {
492                direction,
493                epoch,
494                vnode_watermarks,
495                watermark_type,
496            } => {
497                if let Some(watermark_index) = &mut self.table_watermarks {
498                    watermark_index.add_epoch_watermark(
499                        epoch,
500                        Arc::from(vnode_watermarks),
501                        direction,
502                    );
503                } else {
504                    self.table_watermarks = Some(TableWatermarksIndex::new(
505                        direction,
506                        epoch,
507                        vnode_watermarks,
508                        self.committed.table_committed_epoch(self.table_id),
509                        watermark_type,
510                    ));
511                }
512            }
513        }
514    }
515
516    pub fn staging(&self) -> &StagingVersion {
517        &self.staging
518    }
519
520    pub fn committed(&self) -> &CommittedVersion {
521        &self.committed
522    }
523
524    /// We have assumption that the watermark is increasing monotonically. Therefore,
525    /// here if the upper layer usage has passed an regressed watermark, we should
526    /// filter out the regressed watermark. Currently the kv log store may write
527    /// regressed watermark
528    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
529        if let Some(watermark_index) = &self.table_watermarks {
530            watermark_index.filter_regress_watermarks(watermarks)
531        }
532    }
533
534    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
535        self.table_watermarks
536            .as_ref()
537            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
538    }
539
540    pub fn is_initialized(&self) -> bool {
541        self.is_initialized
542    }
543
544    pub fn is_replicated(&self) -> bool {
545        self.is_replicated
546    }
547
548    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
549        std::mem::replace(&mut self.vnodes, vnodes)
550    }
551
552    pub fn contains(&self, vnode: VirtualNode) -> bool {
553        self.vnodes.is_set(vnode.to_index())
554    }
555
556    pub fn vnodes(&self) -> Arc<Bitmap> {
557        self.vnodes.clone()
558    }
559}
560
561pub fn read_filter_for_version(
562    epoch: HummockEpoch,
563    table_id: TableId,
564    mut table_key_range: TableKeyRange,
565    read_version: &RwLock<HummockReadVersion>,
566) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
567    let read_version_guard = read_version.read();
568
569    let committed_version = read_version_guard.committed().clone();
570
571    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
572        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
573    }
574
575    let (imm_iter, sst_iter) =
576        read_version_guard
577            .staging()
578            .prune_overlap(epoch, table_id, &table_key_range);
579
580    let imms = imm_iter.cloned().collect();
581    let ssts = sst_iter.cloned().collect();
582
583    Ok((table_key_range, (imms, ssts, committed_version)))
584}
585
586#[derive(Clone)]
587pub struct HummockVersionReader {
588    sstable_store: SstableStoreRef,
589
590    /// Statistics
591    state_store_metrics: Arc<HummockStateStoreMetrics>,
592    preload_retry_times: usize,
593}
594
595/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
596/// `streaming_query`
597impl HummockVersionReader {
598    pub fn new(
599        sstable_store: SstableStoreRef,
600        state_store_metrics: Arc<HummockStateStoreMetrics>,
601        preload_retry_times: usize,
602    ) -> Self {
603        Self {
604            sstable_store,
605            state_store_metrics,
606            preload_retry_times,
607        }
608    }
609
610    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
611        &self.state_store_metrics
612    }
613}
614
615const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
616
617impl HummockVersionReader {
618    fn skip_get_by_vnode_user_key_range(
619        sstable_info: &SstableInfo,
620        vnode: VirtualNode,
621        user_key: UserKey<&[u8]>,
622        local_stats: &mut StoreLocalStatistic,
623    ) -> bool {
624        if let Some(vnode_statistics) = &sstable_info.vnode_statistics {
625            // Only skip if vnode-statistics exists and key is out of range.
626            // If vnode-statistics not found, it may be due to incomplete stats (reached limit).
627            if let Some((vnode_min, vnode_max)) = vnode_statistics.get_vnode_user_key_range(vnode) {
628                local_stats.vnode_checked_get_count += 1;
629                if user_key < vnode_min.as_ref() || user_key > vnode_max.as_ref() {
630                    local_stats.vnode_pruned_get_count += 1;
631                    return true;
632                }
633            }
634        }
635        false
636    }
637
638    pub async fn get<'a, O>(
639        &'a self,
640        table_key: TableKey<Bytes>,
641        epoch: u64,
642        table_id: TableId,
643        table_option: TableOption,
644        read_options: ReadOptions,
645        read_version_tuple: ReadVersionTuple,
646        on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
647    ) -> StorageResult<Option<O>> {
648        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
649
650        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
651        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
652        let local_stats = &mut stats_guard.local_stats;
653        local_stats.found_key = true;
654
655        // 1. read staging data
656        for imm in &imms {
657            // skip imm that only holding out-of-date data
658            if imm.epoch() < min_epoch {
659                continue;
660            }
661
662            local_stats.staging_imm_get_count += 1;
663
664            if let Some((data, data_epoch)) = get_from_batch(
665                imm,
666                TableKey(table_key.as_ref()),
667                epoch,
668                &read_options,
669                local_stats,
670            ) {
671                return Ok(if data_epoch.pure_epoch() < min_epoch {
672                    None
673                } else {
674                    data.into_user_value()
675                        .map(|v| {
676                            on_key_value_fn(
677                                FullKey::new_with_gap_epoch(
678                                    table_id,
679                                    table_key.to_ref(),
680                                    data_epoch,
681                                ),
682                                v.as_ref(),
683                            )
684                        })
685                        .transpose()?
686                });
687            }
688        }
689
690        // 2. order guarantee: imm -> sst
691        let dist_key_hash = read_options
692            .prefix_hint
693            .as_ref()
694            .map(|dist_key| Sstable::hash_for_filter(dist_key.as_ref(), table_id.as_raw_id()));
695
696        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
697        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
698        let full_key = FullKey::new_with_gap_epoch(
699            table_id,
700            TableKey(table_key.clone()),
701            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
702        );
703        let single_table_key_range = table_key.clone()..=table_key.clone();
704
705        // prune uncommitted ssts with the keyrange
706        let pruned_uncommitted_ssts =
707            prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
708        for local_sst in pruned_uncommitted_ssts {
709            local_stats.staging_sst_get_count += 1;
710            if let Some(iter) = get_from_sstable_info(
711                self.sstable_store.clone(),
712                local_sst,
713                full_key.to_ref(),
714                &read_options,
715                dist_key_hash,
716                local_stats,
717            )
718            .await?
719            {
720                debug_assert!(iter.is_valid());
721                let data_epoch = iter.key().epoch_with_gap;
722                return Ok(if data_epoch.pure_epoch() < min_epoch {
723                    None
724                } else {
725                    iter.value()
726                        .into_user_value()
727                        .map(|v| {
728                            on_key_value_fn(
729                                FullKey::new_with_gap_epoch(
730                                    table_id,
731                                    table_key.to_ref(),
732                                    data_epoch,
733                                ),
734                                v,
735                            )
736                        })
737                        .transpose()?
738                });
739            }
740        }
741        // 3. read from committed_version sst file
742        // Because SST meta records encoded key range,
743        // the filter key needs to be encoded as well.
744        assert!(committed_version.is_valid());
745        for level in committed_version.levels(table_id) {
746            if level.table_infos.is_empty() {
747                continue;
748            }
749
750            match level.level_type {
751                LevelType::Overlapping | LevelType::Unspecified => {
752                    let sstable_infos = prune_overlapping_ssts(
753                        &level.table_infos,
754                        table_id,
755                        &single_table_key_range,
756                    );
757                    for sstable_info in sstable_infos {
758                        // filter vnode-key range that is definitely not containing the key
759                        if Self::skip_get_by_vnode_user_key_range(
760                            sstable_info,
761                            VirtualNode::from_index(full_key.user_key.get_vnode_id()),
762                            full_key.user_key.as_ref(),
763                            local_stats,
764                        ) {
765                            continue;
766                        }
767
768                        local_stats.overlapping_get_count += 1;
769                        if let Some(iter) = get_from_sstable_info(
770                            self.sstable_store.clone(),
771                            sstable_info,
772                            full_key.to_ref(),
773                            &read_options,
774                            dist_key_hash,
775                            local_stats,
776                        )
777                        .await?
778                        {
779                            debug_assert!(iter.is_valid());
780                            let data_epoch = iter.key().epoch_with_gap;
781                            return Ok(if data_epoch.pure_epoch() < min_epoch {
782                                None
783                            } else {
784                                iter.value()
785                                    .into_user_value()
786                                    .map(|v| {
787                                        on_key_value_fn(
788                                            FullKey::new_with_gap_epoch(
789                                                table_id,
790                                                table_key.to_ref(),
791                                                data_epoch,
792                                            ),
793                                            v,
794                                        )
795                                    })
796                                    .transpose()?
797                            });
798                        }
799                    }
800                }
801                LevelType::Nonoverlapping => {
802                    let mut table_info_idx =
803                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
804                    if table_info_idx == 0 {
805                        continue;
806                    }
807                    table_info_idx = table_info_idx.saturating_sub(1);
808                    let sstable_info = &level.table_infos[table_info_idx];
809
810                    if sstable_info.table_ids.binary_search(&table_id).is_err() {
811                        continue;
812                    }
813
814                    // Filter SSTs that definitely cannot contain the key.
815                    let ord = sstable_info
816                        .key_range
817                        .compare_right_with_user_key(full_key.user_key.as_ref());
818                    // the case that the key falls into the gap between two ssts
819                    if ord == Ordering::Less {
820                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
821                        continue;
822                    }
823
824                    // Filter vnode-key range that is definitely not containing the key.
825                    if Self::skip_get_by_vnode_user_key_range(
826                        sstable_info,
827                        VirtualNode::from_index(full_key.user_key.get_vnode_id()),
828                        full_key.user_key.as_ref(),
829                        local_stats,
830                    ) {
831                        continue;
832                    }
833
834                    local_stats.non_overlapping_get_count += 1;
835                    if let Some(iter) = get_from_sstable_info(
836                        self.sstable_store.clone(),
837                        sstable_info,
838                        full_key.to_ref(),
839                        &read_options,
840                        dist_key_hash,
841                        local_stats,
842                    )
843                    .await?
844                    {
845                        debug_assert!(iter.is_valid());
846                        let data_epoch = iter.key().epoch_with_gap;
847                        return Ok(if data_epoch.pure_epoch() < min_epoch {
848                            None
849                        } else {
850                            iter.value()
851                                .into_user_value()
852                                .map(|v| {
853                                    on_key_value_fn(
854                                        FullKey::new_with_gap_epoch(
855                                            table_id,
856                                            table_key.to_ref(),
857                                            data_epoch,
858                                        ),
859                                        v,
860                                    )
861                                })
862                                .transpose()?
863                        });
864                    }
865                }
866            }
867        }
868        stats_guard.local_stats.found_key = false;
869        Ok(None)
870    }
871
872    pub async fn iter(
873        &self,
874        table_key_range: TableKeyRange,
875        epoch: u64,
876        table_id: TableId,
877        table_option: TableOption,
878        read_options: ReadOptions,
879        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
880    ) -> StorageResult<HummockStorageIterator> {
881        self.iter_with_memtable(
882            table_key_range,
883            epoch,
884            table_id,
885            table_option,
886            read_options,
887            read_version_tuple,
888            None,
889        )
890        .await
891    }
892
893    pub async fn iter_with_memtable<'b>(
894        &self,
895        table_key_range: TableKeyRange,
896        epoch: u64,
897        table_id: TableId,
898        table_option: TableOption,
899        read_options: ReadOptions,
900        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
901        memtable_iter: Option<MemTableHummockIterator<'b>>,
902    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
903        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
904        let user_key_range = (
905            user_key_range_ref.0.map(|key| key.cloned()),
906            user_key_range_ref.1.map(|key| key.cloned()),
907        );
908        let mut factory = ForwardIteratorFactory::default();
909        let mut local_stats = StoreLocalStatistic::default();
910        let (imms, uncommitted_ssts, committed) = read_version_tuple;
911        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
912        self.iter_inner(
913            table_key_range,
914            epoch,
915            table_id,
916            read_options,
917            imms,
918            uncommitted_ssts,
919            &committed,
920            &mut local_stats,
921            &mut factory,
922        )
923        .await?;
924        let merge_iter = factory.build(memtable_iter);
925        // the epoch_range left bound for iterator read
926        let mut user_iter = UserIterator::new(
927            merge_iter,
928            user_key_range,
929            epoch,
930            min_epoch,
931            Some(committed),
932        );
933        user_iter.rewind().await?;
934        Ok(HummockStorageIteratorInner::new(
935            user_iter,
936            self.state_store_metrics.clone(),
937            table_id,
938            local_stats,
939        ))
940    }
941
942    pub async fn rev_iter<'b>(
943        &self,
944        table_key_range: TableKeyRange,
945        epoch: u64,
946        table_id: TableId,
947        table_option: TableOption,
948        read_options: ReadOptions,
949        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
950        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
951    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
952        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
953        let user_key_range = (
954            user_key_range_ref.0.map(|key| key.cloned()),
955            user_key_range_ref.1.map(|key| key.cloned()),
956        );
957        let mut factory = BackwardIteratorFactory::default();
958        let mut local_stats = StoreLocalStatistic::default();
959        let (imms, uncommitted_ssts, committed) = read_version_tuple;
960        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
961        self.iter_inner(
962            table_key_range,
963            epoch,
964            table_id,
965            read_options,
966            imms,
967            uncommitted_ssts,
968            &committed,
969            &mut local_stats,
970            &mut factory,
971        )
972        .await?;
973        let merge_iter = factory.build(memtable_iter);
974        // the epoch_range left bound for iterator read
975        let mut user_iter = BackwardUserIterator::new(
976            merge_iter,
977            user_key_range,
978            epoch,
979            min_epoch,
980            Some(committed),
981        );
982        user_iter.rewind().await?;
983        Ok(HummockStorageRevIteratorInner::new(
984            user_iter,
985            self.state_store_metrics.clone(),
986            table_id,
987            local_stats,
988        ))
989    }
990
991    async fn iter_inner<F: IteratorFactory>(
992        &self,
993        table_key_range: TableKeyRange,
994        epoch: u64,
995        table_id: TableId,
996        read_options: ReadOptions,
997        imms: Vec<ImmutableMemtable>,
998        uncommitted_ssts: Vec<SstableInfo>,
999        committed: &CommittedVersion,
1000        local_stats: &mut StoreLocalStatistic,
1001        factory: &mut F,
1002    ) -> StorageResult<()> {
1003        {
1004            fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
1005                match bound {
1006                    Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
1007                    Bound::Unbounded => None,
1008                }
1009            }
1010            let (left, right) = &table_key_range;
1011            if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
1012                && right < left
1013            {
1014                if cfg!(debug_assertions) {
1015                    panic!("invalid iter key range: {table_id} {left:?} {right:?}")
1016                } else {
1017                    return Err(HummockError::other(format!(
1018                        "invalid iter key range: {table_id} {left:?} {right:?}"
1019                    ))
1020                    .into());
1021                }
1022            }
1023        }
1024
1025        local_stats.staging_imm_iter_count = imms.len() as u64;
1026        for imm in imms {
1027            factory.add_batch_iter(imm);
1028        }
1029
1030        // 2. build iterator from committed
1031        // Because SST meta records encoded key range,
1032        // the filter key range needs to be encoded as well.
1033        let user_key_range = bound_table_key_range(table_id, &table_key_range);
1034        let user_key_range_ref = (
1035            user_key_range.0.as_ref().map(UserKey::as_ref),
1036            user_key_range.1.as_ref().map(UserKey::as_ref),
1037        );
1038        let mut staging_sst_iter_count = 0;
1039        // encode once
1040        let filter_prefix_hash = read_options
1041            .prefix_hint
1042            .as_ref()
1043            .map(|hint| Sstable::hash_for_filter(hint, table_id.as_raw_id()));
1044        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
1045        if read_options.prefetch_options.prefetch {
1046            sst_read_options.must_iterated_end_user_key =
1047                Some(user_key_range.1.map(|key| key.cloned()));
1048            sst_read_options.max_preload_retry_times = self.preload_retry_times;
1049        }
1050        let sst_read_options = Arc::new(sst_read_options);
1051        for sstable_info in &uncommitted_ssts {
1052            let table_holder = self
1053                .sstable_store
1054                .sstable(sstable_info, local_stats)
1055                .await?;
1056
1057            if let Some(prefix_hash) = filter_prefix_hash.as_ref()
1058                && !hit_sstable_filter(
1059                    &table_holder,
1060                    &user_key_range_ref,
1061                    *prefix_hash,
1062                    local_stats,
1063                )
1064            {
1065                continue;
1066            }
1067
1068            staging_sst_iter_count += 1;
1069            factory.add_staging_sst_iter(F::SstableIteratorType::create(
1070                table_holder,
1071                self.sstable_store.clone(),
1072                sst_read_options.clone(),
1073                sstable_info,
1074            ));
1075        }
1076        local_stats.staging_sst_iter_count = staging_sst_iter_count;
1077
1078        let timer = Instant::now();
1079
1080        for level in committed.levels(table_id) {
1081            if level.table_infos.is_empty() {
1082                continue;
1083            }
1084
1085            if level.level_type == LevelType::Nonoverlapping {
1086                let mut table_infos =
1087                    prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1088                        .peekable();
1089
1090                if table_infos.peek().is_none() {
1091                    continue;
1092                }
1093                let sstable_infos = table_infos.cloned().collect_vec();
1094                if sstable_infos.len() > 1 {
1095                    factory.add_concat_sst_iter(
1096                        sstable_infos,
1097                        self.sstable_store.clone(),
1098                        sst_read_options.clone(),
1099                    );
1100                    local_stats.non_overlapping_iter_count += 1;
1101                } else {
1102                    let sstable_info = &sstable_infos[0];
1103
1104                    let sstable = self
1105                        .sstable_store
1106                        .sstable(sstable_info, local_stats)
1107                        .await?;
1108
1109                    if let Some(dist_hash) = filter_prefix_hash.as_ref()
1110                        && !hit_sstable_filter(
1111                            &sstable,
1112                            &user_key_range_ref,
1113                            *dist_hash,
1114                            local_stats,
1115                        )
1116                    {
1117                        continue;
1118                    }
1119                    // Since there is only one sst to be included for the current non-overlapping
1120                    // level, there is no need to create a ConcatIterator on it.
1121                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1122                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1123                    // it in `non_overlapping_iter_count`.
1124                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1125                        sstable,
1126                        self.sstable_store.clone(),
1127                        sst_read_options.clone(),
1128                        sstable_info,
1129                    ));
1130                    local_stats.non_overlapping_iter_count += 1;
1131                }
1132            } else {
1133                let table_infos =
1134                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1135                // Overlapping
1136                let fetch_meta_req = table_infos.rev().collect_vec();
1137                if fetch_meta_req.is_empty() {
1138                    continue;
1139                }
1140                for sstable_info in fetch_meta_req {
1141                    let sstable = self
1142                        .sstable_store
1143                        .sstable(sstable_info, local_stats)
1144                        .await?;
1145                    assert_eq!(sstable_info.object_id, sstable.id);
1146                    if let Some(dist_hash) = filter_prefix_hash.as_ref()
1147                        && !hit_sstable_filter(
1148                            &sstable,
1149                            &user_key_range_ref,
1150                            *dist_hash,
1151                            local_stats,
1152                        )
1153                    {
1154                        continue;
1155                    }
1156                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1157                        sstable,
1158                        self.sstable_store.clone(),
1159                        sst_read_options.clone(),
1160                        sstable_info,
1161                    ));
1162                    local_stats.overlapping_iter_count += 1;
1163                }
1164            }
1165        }
1166        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1167        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1168            let table_id_string = table_id.to_string();
1169            tracing::warn!(
1170                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1171                table_id_string,
1172                epoch,
1173                fetch_meta_duration_sec,
1174                local_stats.cache_meta_block_miss
1175            );
1176            self.state_store_metrics
1177                .iter_slow_fetch_meta_cache_unhits
1178                .set(local_stats.cache_meta_block_miss as i64);
1179        }
1180        Ok(())
1181    }
1182
1183    pub async fn iter_log(
1184        &self,
1185        epoch_range: (u64, u64),
1186        key_range: TableKeyRange,
1187        options: ReadLogOptions,
1188        table_change_log_manager: Arc<TableChangeLogManager>,
1189    ) -> HummockResult<ChangeLogIterator> {
1190        // The end value of `epoch_range` is not greater than max committed epoch, guaranteed by the caller `BatchTableInnerIterLogInner`.
1191        let change_log: Vec<_> = {
1192            let table_change_logs = table_change_log_manager
1193                .fetch_table_change_logs(options.table_id, epoch_range, false, None)
1194                .await?;
1195            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1196                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1197            } else {
1198                Vec::new()
1199            }
1200        };
1201
1202        if let Some(max_epoch_change_log) = change_log.last() {
1203            let (_, max_epoch) = epoch_range;
1204            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1205                warn!(
1206                    max_epoch,
1207                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1208                    table_id = %options.table_id,
1209                    "max_epoch does not exist"
1210                );
1211            }
1212        }
1213        let read_options = Arc::new(SstableIteratorReadOptions {
1214            cache_policy: Default::default(),
1215            must_iterated_end_user_key: None,
1216            max_preload_retry_times: 0,
1217            prefetch_for_large_query: false,
1218        });
1219
1220        async fn make_iter(
1221            sstable_infos: impl Iterator<Item = &SstableInfo>,
1222            sstable_store: &SstableStoreRef,
1223            read_options: Arc<SstableIteratorReadOptions>,
1224            local_stat: &mut StoreLocalStatistic,
1225        ) -> HummockResult<MergeIterator<SstableIterator>> {
1226            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1227                let sstable_store = sstable_store.clone();
1228                let read_options = read_options.clone();
1229                async move {
1230                    let mut local_stat = StoreLocalStatistic::default();
1231                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1232                    Ok::<_, HummockError>((
1233                        SstableIterator::new(
1234                            table_holder,
1235                            sstable_store,
1236                            read_options,
1237                            sstable_info,
1238                        ),
1239                        local_stat,
1240                    ))
1241                }
1242            }))
1243            .await?;
1244            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1245                |(iter, stats)| {
1246                    local_stat.add(&stats);
1247                    iter
1248                },
1249            )))
1250        }
1251
1252        let mut local_stat = StoreLocalStatistic::default();
1253
1254        let new_value_iter = make_iter(
1255            change_log
1256                .iter()
1257                .flat_map(|log| log.new_value.iter())
1258                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1259            &self.sstable_store,
1260            read_options.clone(),
1261            &mut local_stat,
1262        )
1263        .await?;
1264        let old_value_iter = make_iter(
1265            change_log
1266                .iter()
1267                .flat_map(|log| log.old_value.iter())
1268                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1269            &self.sstable_store,
1270            read_options.clone(),
1271            &mut local_stat,
1272        )
1273        .await?;
1274        ChangeLogIterator::new(
1275            epoch_range,
1276            key_range,
1277            new_value_iter,
1278            old_value_iter,
1279            options.table_id,
1280            IterLocalMetricsGuard::new(
1281                self.state_store_metrics.clone(),
1282                options.table_id,
1283                local_stat,
1284            ),
1285        )
1286        .await
1287    }
1288
1289    pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1290        &'a self,
1291        version: PinnedVersion,
1292        table_id: TableId,
1293        target: VectorRef<'a>,
1294        options: VectorNearestOptions,
1295        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1296    ) -> HummockResult<Vec<O>> {
1297        let Some(index) = version.vector_indexes.get(&table_id) else {
1298            return Ok(vec![]);
1299        };
1300        if target.dimension() != index.dimension {
1301            return Err(HummockError::other(format!(
1302                "target dimension {} not match index dimension {}",
1303                target.dimension(),
1304                index.dimension
1305            )));
1306        }
1307        match &index.inner {
1308            VectorIndexImpl::Flat(flat) => {
1309                let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1310                let mut cache_stat = VectorStoreCacheStats::default();
1311                for vector_file in &flat.vector_store_info.vector_files {
1312                    let meta = self
1313                        .sstable_store
1314                        .get_vector_file_meta(vector_file, &mut cache_stat)
1315                        .await?;
1316                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1317                        let block = self
1318                            .sstable_store
1319                            .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1320                            .await?;
1321                        builder.add(&**block, &on_nearest_item_fn);
1322                    }
1323                }
1324                cache_stat.report(table_id, "flat", self.stats());
1325                Ok(builder.finish())
1326            }
1327            VectorIndexImpl::HnswFlat(hnsw_flat) => {
1328                let Some(graph_file) = &hnsw_flat.graph_file else {
1329                    return Ok(vec![]);
1330                };
1331
1332                let mut ctx = FileVectorStoreCtx::default();
1333
1334                let graph = self
1335                    .sstable_store
1336                    .get_hnsw_graph(graph_file, &mut ctx.stats)
1337                    .await?;
1338
1339                let vector_store =
1340                    FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1341                let (items, stats) = nearest::<O, M, _>(
1342                    &vector_store,
1343                    &mut ctx,
1344                    &*graph,
1345                    target,
1346                    on_nearest_item_fn,
1347                    options.hnsw_ef_search,
1348                    options.top_n,
1349                )
1350                .await?;
1351                ctx.stats.report(table_id, "hnsw_read", self.stats());
1352                report_hnsw_stat(
1353                    self.stats(),
1354                    table_id,
1355                    "hnsw_read",
1356                    options.top_n,
1357                    options.hnsw_ef_search,
1358                    [stats],
1359                );
1360                Ok(items)
1361            }
1362        }
1363    }
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368    use std::collections::{BTreeMap, HashMap, HashSet};
1369    use std::sync::Arc;
1370
1371    use bytes::Bytes;
1372    use prometheus::Registry;
1373    use risingwave_common::catalog::{TableId, TableOption};
1374    use risingwave_common::config::MetricLevel;
1375    use risingwave_common::hash::VirtualNode;
1376    use risingwave_common::util::epoch::test_epoch;
1377    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
1378    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_bytes};
1379    use risingwave_hummock_sdk::key_range::KeyRange;
1380    use risingwave_hummock_sdk::level::{Level, Levels, OverlappingLevel};
1381    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
1382    use risingwave_hummock_sdk::version::HummockVersion;
1383    use risingwave_hummock_sdk::{EpochWithGap, HummockSstableObjectId};
1384    use risingwave_pb::hummock::hummock_version::PbLevels;
1385    use risingwave_pb::hummock::{
1386        LevelType as PbLevelType, PbBloomFilterType, PbHummockVersion, PbLevel, PbOverlappingLevel,
1387        PbStateTableInfo, StateTableInfoDelta,
1388    };
1389    use tokio::sync::mpsc::unbounded_channel;
1390
1391    use crate::hummock::HummockValue;
1392    use crate::hummock::iterator::test_utils::mock_sstable_store;
1393    use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
1394    use crate::hummock::store::version::{CommittedVersion, HummockVersionReader};
1395    use crate::hummock::test_utils::{
1396        default_builder_opt_for_test, gen_test_sstable_with_table_ids,
1397    };
1398    use crate::monitor::{HummockStateStoreMetrics, flush_local_metrics_for_test};
1399    use crate::store::ReadOptions;
1400
1401    /// In a nonoverlapping level, `search_sst_idx` may locate an SST whose key range covers
1402    /// the query user key but whose `table_ids` does not contain the queried table. The get
1403    /// path must skip such SSTs instead of reading them.
1404    #[tokio::test]
1405    async fn test_get_skips_sst_by_table_id_filter() {
1406        let query_table_id = TableId::new(100);
1407        let epoch: u64 = (31 * 1000) << 16;
1408        let compaction_group_id = StaticCompactionGroupId::StateDefault;
1409
1410        // SST key range: table_id 50..150, but table_ids = [50, 150] (no 100).
1411        let sst_info = SstableInfoInner {
1412            sst_id: 1.into(),
1413            object_id: 1.into(),
1414            key_range: KeyRange {
1415                left: Bytes::from(
1416                    FullKey::for_test(TableId::new(50), b"aaa".to_vec(), epoch).encode(),
1417                ),
1418                right: Bytes::from(
1419                    FullKey::for_test(TableId::new(150), b"zzz".to_vec(), epoch).encode(),
1420                ),
1421                right_exclusive: false,
1422            },
1423            table_ids: vec![TableId::new(50), TableId::new(150)],
1424            file_size: 1024,
1425            ..Default::default()
1426        }
1427        .into();
1428
1429        let level = Level {
1430            level_idx: 1,
1431            level_type: PbLevelType::Nonoverlapping,
1432            table_infos: vec![sst_info],
1433            total_file_size: 0,
1434            sub_level_id: 0,
1435            uncompressed_file_size: 0,
1436            vnode_partition_count: 0,
1437        };
1438
1439        #[allow(deprecated)]
1440        let levels = Levels {
1441            levels: vec![level],
1442            l0: OverlappingLevel::default(),
1443            group_id: compaction_group_id,
1444            parent_group_id: compaction_group_id,
1445            member_table_ids: vec![],
1446            compaction_group_version_id: 0,
1447        };
1448
1449        let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
1450            id: 1u64.into(),
1451            ..Default::default()
1452        });
1453        version.levels.insert(compaction_group_id, levels);
1454        version.state_table_info.apply_delta(
1455            &HashMap::from([(
1456                query_table_id,
1457                StateTableInfoDelta {
1458                    committed_epoch: epoch,
1459                    compaction_group_id,
1460                },
1461            )]),
1462            &HashSet::new(),
1463        );
1464
1465        let pinned_version = PinnedVersion::new(version, unbounded_channel().0);
1466        let reader = HummockVersionReader::new(
1467            mock_sstable_store().await,
1468            Arc::new(HummockStateStoreMetrics::unused()),
1469            0,
1470        );
1471
1472        let result = reader
1473            .get(
1474                TableKey(Bytes::from("test_key")),
1475                epoch,
1476                query_table_id,
1477                TableOption::default(),
1478                ReadOptions::default(),
1479                (vec![], vec![], pinned_version),
1480                |_key, _value| Ok(()),
1481            )
1482            .await
1483            .unwrap();
1484
1485        assert!(result.is_none());
1486    }
1487
1488    /// Build a committed version containing a single SST with custom vnode stats.
1489    #[allow(deprecated)]
1490    fn build_version_with_vnode_stats(
1491        table_id: TableId,
1492        vnode_stats: VnodeStatistics,
1493        key_range: (Vec<u8>, Vec<u8>),
1494        level_type: PbLevelType,
1495    ) -> (SstableInfo, CommittedVersion) {
1496        let object_id = HummockSstableObjectId::new(1);
1497        let left_full_key = FullKey::new_with_gap_epoch(
1498            table_id,
1499            TableKey(Bytes::from(key_range.0)),
1500            EpochWithGap::new_from_epoch(test_epoch(0)),
1501        )
1502        .encode();
1503        let right_full_key = FullKey::new_with_gap_epoch(
1504            table_id,
1505            TableKey(Bytes::from(key_range.1)),
1506            EpochWithGap::new_from_epoch(test_epoch(0)),
1507        )
1508        .encode();
1509
1510        let sstable_info: SstableInfo = SstableInfoInner {
1511            object_id,
1512            sst_id: object_id.as_raw_id().into(),
1513            key_range: KeyRange {
1514                left: Bytes::from(left_full_key),
1515                right: Bytes::from(right_full_key),
1516                right_exclusive: false,
1517            },
1518            file_size: 1,
1519            table_ids: vec![table_id],
1520            meta_offset: 0,
1521            stale_key_count: 0,
1522            total_key_count: 0,
1523            min_epoch: 0,
1524            max_epoch: 0,
1525            uncompressed_file_size: 0,
1526            range_tombstone_count: 0,
1527            bloom_filter_kind: PbBloomFilterType::Sstable,
1528            filter_type: risingwave_pb::hummock::PbSstableFilterType::SstableFilterXor16,
1529            sst_size: 1,
1530            vnode_statistics: Some(vnode_stats),
1531        }
1532        .into();
1533        let pb_level = PbLevel {
1534            level_idx: if level_type == PbLevelType::Overlapping {
1535                0
1536            } else {
1537                1
1538            },
1539            level_type: level_type as i32,
1540            table_infos: vec![sstable_info.clone().into()],
1541            total_file_size: 1,
1542            sub_level_id: 0,
1543            uncompressed_file_size: 1,
1544            vnode_partition_count: 0,
1545        };
1546
1547        let (levels, l0) = if level_type == PbLevelType::Overlapping {
1548            (
1549                vec![],
1550                Some(PbOverlappingLevel {
1551                    sub_levels: vec![pb_level],
1552                    total_file_size: 1,
1553                    uncompressed_file_size: 1,
1554                }),
1555            )
1556        } else {
1557            (vec![pb_level], Some(PbOverlappingLevel::default()))
1558        };
1559
1560        let pb_levels = PbLevels {
1561            levels,
1562            l0,
1563            group_id: StaticCompactionGroupId::NewCompactionGroup,
1564            parent_group_id: 0.into(),
1565            member_table_ids: vec![],
1566            compaction_group_version_id: 0,
1567        };
1568
1569        let pb_version = PbHummockVersion {
1570            id: 1.into(),
1571            levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1572            max_committed_epoch: 0,
1573            table_watermarks: HashMap::new(),
1574            table_change_logs: HashMap::new(),
1575            state_table_info: HashMap::from_iter([(
1576                table_id,
1577                PbStateTableInfo {
1578                    committed_epoch: 0,
1579                    compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1580                },
1581            )]),
1582            vector_indexes: HashMap::new(),
1583        };
1584
1585        let version = HummockVersion::from(&pb_version);
1586        let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1587        let pinned = PinnedVersion::new(version, tx);
1588        (sstable_info, pinned)
1589    }
1590
1591    /// Build a committed version from an existing SST (with real object in the store).
1592    #[allow(deprecated)]
1593    fn build_version_from_sstables(
1594        table_id: TableId,
1595        sstable_infos: Vec<SstableInfo>,
1596        level_type: PbLevelType,
1597    ) -> CommittedVersion {
1598        let total_file_size = sstable_infos.iter().map(|sst| sst.file_size).sum::<u64>();
1599        let uncompressed_file_size = sstable_infos
1600            .iter()
1601            .map(|sst| sst.uncompressed_file_size)
1602            .sum::<u64>();
1603        let pb_level = PbLevel {
1604            level_idx: if level_type == PbLevelType::Overlapping {
1605                0
1606            } else {
1607                1
1608            },
1609            level_type: level_type as i32,
1610            table_infos: sstable_infos.into_iter().map(Into::into).collect(),
1611            total_file_size,
1612            sub_level_id: 0,
1613            uncompressed_file_size,
1614            vnode_partition_count: 0,
1615        };
1616
1617        let (levels, l0) = if level_type == PbLevelType::Overlapping {
1618            (
1619                vec![],
1620                Some(PbOverlappingLevel {
1621                    sub_levels: vec![pb_level],
1622                    total_file_size,
1623                    uncompressed_file_size,
1624                }),
1625            )
1626        } else {
1627            (vec![pb_level], Some(PbOverlappingLevel::default()))
1628        };
1629
1630        let pb_levels = PbLevels {
1631            levels,
1632            l0,
1633            group_id: StaticCompactionGroupId::NewCompactionGroup,
1634            parent_group_id: 0.into(),
1635            member_table_ids: vec![],
1636            compaction_group_version_id: 0,
1637        };
1638
1639        let pb_version = PbHummockVersion {
1640            id: 1.into(),
1641            levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1642            max_committed_epoch: 0,
1643            table_watermarks: HashMap::new(),
1644            table_change_logs: HashMap::new(),
1645            state_table_info: HashMap::from_iter([(
1646                table_id,
1647                PbStateTableInfo {
1648                    committed_epoch: 0,
1649                    compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1650                },
1651            )]),
1652            vector_indexes: HashMap::new(),
1653        };
1654
1655        let version = HummockVersion::from(&pb_version);
1656        let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1657        PinnedVersion::new(version, tx)
1658    }
1659
1660    /// Build a committed version from one existing non-overlapping SST.
1661    #[allow(deprecated)]
1662    fn build_version_from_sstable(
1663        table_id: TableId,
1664        sstable_info: SstableInfo,
1665    ) -> CommittedVersion {
1666        build_version_from_sstables(table_id, vec![sstable_info], PbLevelType::Nonoverlapping)
1667    }
1668
1669    fn vnode_prune_counts(
1670        metrics: &HummockStateStoreMetrics,
1671        table_id: TableId,
1672        operation: &str,
1673    ) -> (u64, u64) {
1674        let table_label = table_id.to_string();
1675        let checked = metrics
1676            .vnode_pruning_counts
1677            .with_guarded_label_values(&[
1678                table_label.clone(),
1679                operation.to_owned(),
1680                "checked".to_owned(),
1681            ])
1682            .get();
1683        let pruned = metrics
1684            .vnode_pruning_counts
1685            .with_guarded_label_values(&[table_label, operation.to_owned(), "pruned".to_owned()])
1686            .get();
1687        (checked, pruned)
1688    }
1689
1690    async fn assert_vnode_prune_get_skips_out_of_range_key(
1691        table_id: TableId,
1692        epoch: u64,
1693        level_type: PbLevelType,
1694    ) {
1695        let sstable_store = mock_sstable_store().await;
1696        let registry = Registry::new();
1697        let metrics = Arc::new(HummockStateStoreMetrics::new(&registry, MetricLevel::Debug));
1698        let reader = HummockVersionReader::new(sstable_store, metrics.clone(), 0);
1699        let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1700
1701        let make_user_key = |vnode: VirtualNode, suffix: &str| {
1702            let mut raw = vnode.to_be_bytes().to_vec();
1703            raw.extend_from_slice(suffix.as_bytes());
1704            UserKey::new(table_id, TableKey(raw.into()))
1705        };
1706
1707        // Stats cover vnode 1 only up to "bb".
1708        let vnode_stats = VnodeStatistics::from_map(BTreeMap::from_iter([(
1709            VirtualNode::from_index(1),
1710            (
1711                make_user_key(VirtualNode::from_index(1), "aa"),
1712                make_user_key(VirtualNode::from_index(1), "bb"),
1713            ),
1714        )]));
1715
1716        // SST key range is wide enough to include the queried key, but vnode stats should prune it.
1717        let key_range = {
1718            let mut left = VirtualNode::from_index(0).to_be_bytes().to_vec();
1719            left.extend_from_slice(b"aa");
1720            let mut right = VirtualNode::from_index(1).to_be_bytes().to_vec();
1721            right.extend_from_slice(b"zzzz");
1722            (left, right)
1723        };
1724
1725        let (_sst, committed) =
1726            build_version_with_vnode_stats(table_id, vnode_stats, key_range, level_type);
1727
1728        // Query vnode 1 but with suffix beyond the recorded max -> should be pruned.
1729        let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1730        raw.extend_from_slice(b"zz");
1731        let table_key = TableKey(Bytes::from(raw.clone()));
1732
1733        let result = reader
1734            .get(
1735                table_key,
1736                epoch,
1737                table_id,
1738                TableOption::default(),
1739                ReadOptions::default(),
1740                (vec![], vec![], committed),
1741                |_k, v| Ok(Bytes::copy_from_slice(v)),
1742            )
1743            .await
1744            .unwrap();
1745        flush_local_metrics_for_test();
1746
1747        assert!(
1748            result.is_none(),
1749            "vnode pruning should skip SST without reading data"
1750        );
1751        let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1752        assert_eq!(checked_before + 1, checked_after);
1753        assert_eq!(pruned_before + 1, pruned_after);
1754    }
1755
1756    async fn assert_vnode_prune_get_not_pruned_nonoverlapping() {
1757        let table_id = TableId::new(42);
1758        let epoch = test_epoch(3);
1759        let sstable_store = mock_sstable_store().await;
1760        let registry = Registry::new();
1761        let metrics = Arc::new(HummockStateStoreMetrics::new(&registry, MetricLevel::Debug));
1762        let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1763        let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1764
1765        let mut opts = default_builder_opt_for_test();
1766        opts.max_vnode_key_range_bytes = None;
1767        let mut kvs = vec![
1768            (
1769                FullKey::new_with_gap_epoch(
1770                    table_id,
1771                    gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1772                    EpochWithGap::new_from_epoch(epoch),
1773                ),
1774                HummockValue::put(Bytes::from_static(b"v1")),
1775            ),
1776            (
1777                FullKey::new_with_gap_epoch(
1778                    table_id,
1779                    gen_key_from_bytes(VirtualNode::ZERO, b"cc"),
1780                    EpochWithGap::new_from_epoch(epoch),
1781                ),
1782                HummockValue::put(Bytes::from_static(b"v0")),
1783            ),
1784        ];
1785        kvs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1786        let (_, mut sstable_info): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1787            gen_test_sstable_with_table_ids(
1788                opts,
1789                10,
1790                kvs.into_iter(),
1791                sstable_store.clone(),
1792                vec![table_id.as_raw_id()],
1793            )
1794            .await;
1795        // Override vnode stats to ensure the queried key falls inside the recorded range.
1796        let mut inner = sstable_info.get_inner();
1797        inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1798            VirtualNode::from_index(1),
1799            (
1800                UserKey::new(
1801                    table_id,
1802                    gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1803                ),
1804                UserKey::new(
1805                    table_id,
1806                    gen_key_from_bytes(VirtualNode::from_index(1), b"zz"),
1807                ),
1808            ),
1809        )])));
1810        sstable_info = inner.into();
1811        let committed = build_version_from_sstable(table_id, sstable_info.clone());
1812
1813        // Key is within vnode range, should not be pruned.
1814        let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1815        raw.extend_from_slice(b"aa");
1816        let table_key = TableKey(Bytes::from(raw.clone()));
1817
1818        let result = reader
1819            .get(
1820                table_key,
1821                epoch,
1822                table_id,
1823                TableOption::default(),
1824                ReadOptions::default(),
1825                (vec![], vec![], committed),
1826                |_k, v| Ok(Bytes::copy_from_slice(v)),
1827            )
1828            .await
1829            .unwrap();
1830        flush_local_metrics_for_test();
1831        assert!(result.is_some(), "key should be read when not pruned");
1832        let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1833        assert_eq!(checked_before + 1, checked_after);
1834        assert_eq!(pruned_before, pruned_after);
1835    }
1836
1837    #[tokio::test]
1838    async fn test_vnode_prune_get_single_sst_cases() {
1839        assert_vnode_prune_get_skips_out_of_range_key(
1840            TableId::default(),
1841            test_epoch(1),
1842            PbLevelType::Nonoverlapping,
1843        )
1844        .await;
1845        assert_vnode_prune_get_skips_out_of_range_key(
1846            TableId::new(7),
1847            test_epoch(2),
1848            PbLevelType::Overlapping,
1849        )
1850        .await;
1851        assert_vnode_prune_get_not_pruned_nonoverlapping().await;
1852    }
1853
1854    #[tokio::test]
1855    async fn test_vnode_prune_get_overlapping_distribution_prunes_only_out_of_range_sst() {
1856        let table_id = TableId::new(77);
1857        let epoch = test_epoch(4);
1858        let vnode = VirtualNode::from_index(1);
1859        let sstable_store = mock_sstable_store().await;
1860        let registry = Registry::new();
1861        let metrics = Arc::new(HummockStateStoreMetrics::new(&registry, MetricLevel::Debug));
1862        let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1863        let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1864
1865        let mut opts = default_builder_opt_for_test();
1866        opts.max_vnode_key_range_bytes = None;
1867
1868        let mut kvs1 = vec![
1869            (
1870                FullKey::new_with_gap_epoch(
1871                    table_id,
1872                    gen_key_from_bytes(vnode, b"aa"),
1873                    EpochWithGap::new_from_epoch(epoch),
1874                ),
1875                HummockValue::put(Bytes::from_static(b"s1_aa")),
1876            ),
1877            (
1878                FullKey::new_with_gap_epoch(
1879                    table_id,
1880                    gen_key_from_bytes(vnode, b"zz"),
1881                    EpochWithGap::new_from_epoch(epoch),
1882                ),
1883                HummockValue::put(Bytes::from_static(b"s1_zz")),
1884            ),
1885        ];
1886        kvs1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1887        let (_, mut sst1): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1888            gen_test_sstable_with_table_ids(
1889                opts.clone(),
1890                11,
1891                kvs1.into_iter(),
1892                sstable_store.clone(),
1893                vec![table_id.as_raw_id()],
1894            )
1895            .await;
1896        let mut sst1_inner = sst1.get_inner();
1897        sst1_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1898            vnode,
1899            (
1900                UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1901                UserKey::new(table_id, gen_key_from_bytes(vnode, b"bb")),
1902            ),
1903        )])));
1904        sst1 = sst1_inner.into();
1905
1906        let mut kvs2 = vec![(
1907            FullKey::new_with_gap_epoch(
1908                table_id,
1909                gen_key_from_bytes(vnode, b"mm"),
1910                EpochWithGap::new_from_epoch(epoch),
1911            ),
1912            HummockValue::put(Bytes::from_static(b"hit")),
1913        )];
1914        kvs2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1915        let (_, mut sst2): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1916            gen_test_sstable_with_table_ids(
1917                opts,
1918                12,
1919                kvs2.into_iter(),
1920                sstable_store.clone(),
1921                vec![table_id.as_raw_id()],
1922            )
1923            .await;
1924        let mut sst2_inner = sst2.get_inner();
1925        sst2_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1926            vnode,
1927            (
1928                UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1929                UserKey::new(table_id, gen_key_from_bytes(vnode, b"zz")),
1930            ),
1931        )])));
1932        sst2 = sst2_inner.into();
1933
1934        let committed =
1935            build_version_from_sstables(table_id, vec![sst1, sst2], PbLevelType::Overlapping);
1936
1937        let table_key = gen_key_from_bytes(vnode, b"mm");
1938        let result = reader
1939            .get(
1940                table_key,
1941                epoch,
1942                table_id,
1943                TableOption::default(),
1944                ReadOptions::default(),
1945                (vec![], vec![], committed),
1946                |_k, v| Ok(Bytes::copy_from_slice(v)),
1947            )
1948            .await
1949            .unwrap();
1950        flush_local_metrics_for_test();
1951
1952        assert_eq!(result, Some(Bytes::from_static(b"hit")));
1953        let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1954        // First SST is pruned by vnode stats; second SST is checked and read.
1955        assert_eq!(checked_before + 2, checked_after);
1956        assert_eq!(pruned_before + 1, pruned_after);
1957    }
1958}