risingwave_storage/hummock/store/
version.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::{self};
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::array::VectorRef;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{TableId, TableOption};
29use risingwave_common::hash::VirtualNode;
30use risingwave_common::util::epoch::MAX_SPILL_TIMES;
31use risingwave_hummock_sdk::key::{
32    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
33};
34use risingwave_hummock_sdk::key_range::KeyRangeCommon;
35use risingwave_hummock_sdk::sstable_info::SstableInfo;
36use risingwave_hummock_sdk::table_watermark::{
37    TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
38};
39use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
40use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
41use risingwave_pb::hummock::LevelType;
42use sync_point::sync_point;
43use tracing::warn;
44
45use crate::error::StorageResult;
46use crate::hummock::event_handler::LocalInstanceId;
47use crate::hummock::iterator::change_log::ChangeLogIterator;
48use crate::hummock::iterator::{
49    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
50};
51use crate::hummock::local_version::pinned_version::PinnedVersion;
52use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
53use crate::hummock::sstable_store::SstableStoreRef;
54use crate::hummock::table_change_log_manager::TableChangeLogManager;
55use crate::hummock::utils::{
56    MemoryTracker, filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts,
57    range_overlap, search_sst_idx,
58};
59use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
60use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
61use crate::hummock::{
62    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
63    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
64    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
65    hit_sstable_bloom_filter,
66};
67use crate::mem_table::{
68    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
69};
70use crate::monitor::{
71    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
72};
73use crate::store::{
74    OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
75};
76use crate::vector::hnsw::nearest;
77use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
78
79pub type CommittedVersion = PinnedVersion;
80
81/// Data not committed to Hummock. There are two types of staging data:
82/// - Immutable memtable: data that has been written into local state store but not persisted.
83/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
84///   hummock version.
85
86#[derive(Clone, Debug, PartialEq)]
87pub struct StagingSstableInfo {
88    // newer data comes first
89    sstable_infos: Vec<LocalSstableInfo>,
90    old_value_sstable_infos: Vec<LocalSstableInfo>,
91    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
92    /// The field must not be empty.
93    epochs: Vec<HummockEpoch>,
94    // newer data at the front
95    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96    imm_size: usize,
97}
98
99impl StagingSstableInfo {
100    pub fn new(
101        sstable_infos: Vec<LocalSstableInfo>,
102        old_value_sstable_infos: Vec<LocalSstableInfo>,
103        epochs: Vec<HummockEpoch>,
104        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
105        imm_size: usize,
106    ) -> Self {
107        // the epochs are sorted from higher epoch to lower epoch
108        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
109        Self {
110            sstable_infos,
111            old_value_sstable_infos,
112            epochs,
113            imm_ids,
114            imm_size,
115        }
116    }
117
118    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
119        &self.sstable_infos
120    }
121
122    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
123        &self.old_value_sstable_infos
124    }
125
126    pub fn imm_size(&self) -> usize {
127        self.imm_size
128    }
129
130    pub fn epochs(&self) -> &Vec<HummockEpoch> {
131        &self.epochs
132    }
133
134    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
135        &self.imm_ids
136    }
137}
138
139pub enum VersionUpdate {
140    Sst(Arc<StagingSstableInfo>),
141    CommittedSnapshot(CommittedVersion),
142    NewTableWatermark {
143        direction: WatermarkDirection,
144        epoch: HummockEpoch,
145        vnode_watermarks: Vec<VnodeWatermark>,
146        watermark_type: WatermarkSerdeType,
147    },
148}
149
150pub struct StagingVersion {
151    pending_imm_size: usize,
152    /// It contains the imms added but not sent to the uploader of hummock event handler.
153    /// It is non-empty only when `upload_on_flush` is false.
154    ///
155    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
156    ///
157    /// newer data comes last
158    pub pending_imms: Vec<(ImmutableMemtable, MemoryTracker)>,
159    /// It contains the imms already sent to uploader of hummock event handler.
160    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
161    /// imm of smaller batch id may be added later than one with greater batch id
162    ///
163    /// Newer data comes first.
164    pub uploading_imms: VecDeque<ImmutableMemtable>,
165
166    // newer data comes first
167    pub sst: VecDeque<Arc<StagingSstableInfo>>,
168}
169
170impl StagingVersion {
171    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
172    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
173    pub fn prune_overlap<'a>(
174        &'a self,
175        max_epoch_inclusive: HummockEpoch,
176        table_id: TableId,
177        table_key_range: &'a TableKeyRange,
178    ) -> (
179        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
180        impl Iterator<Item = &'a SstableInfo> + 'a,
181    ) {
182        let (left, right) = table_key_range;
183        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
184        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
185        let overlapped_imms = self
186            .pending_imms
187            .iter()
188            .map(|(imm, _)| imm)
189            .rev() // rev to let newer imm come first
190            .chain(self.uploading_imms.iter())
191            .filter(move |imm| {
192                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
193                imm.min_epoch() <= max_epoch_inclusive
194                    && imm.table_id == table_id
195                    && range_overlap(
196                        &(left, right),
197                        &imm.start_table_key(),
198                        Bound::Included(&imm.end_table_key()),
199                    )
200            });
201
202        // TODO: Remove duplicate sst based on sst id
203        let overlapped_ssts = self
204            .sst
205            .iter()
206            .filter(move |staging_sst| {
207                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
208                sst_max_epoch <= max_epoch_inclusive
209            })
210            .flat_map(move |staging_sst| {
211                // TODO: sstable info should be concat-able after each streaming table owns a read
212                // version. May use concat sstable iter instead in some cases.
213                staging_sst
214                    .sstable_infos
215                    .iter()
216                    .map(|sstable| &sstable.sst_info)
217                    .filter(move |sstable: &&SstableInfo| {
218                        filter_single_sst(sstable, table_id, table_key_range)
219                    })
220            });
221        (overlapped_imms, overlapped_ssts)
222    }
223
224    pub fn is_empty(&self) -> bool {
225        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
226    }
227}
228
229/// A container of information required for reading from hummock.
230pub struct HummockReadVersion {
231    table_id: TableId,
232    instance_id: LocalInstanceId,
233
234    is_initialized: bool,
235
236    /// Local version for staging data.
237    staging: StagingVersion,
238
239    /// Remote version for committed data.
240    committed: CommittedVersion,
241
242    /// Indicate if this is replicated. If it is, we should ignore it during
243    /// global state store read, to avoid duplicated results.
244    /// Otherwise for local state store, it is fine, see we will see the
245    /// `ReadVersion` just for that local state store.
246    is_replicated: bool,
247
248    table_watermarks: Option<TableWatermarksIndex>,
249
250    // Vnode bitmap corresponding to the read version
251    // It will be initialized after local state store init
252    vnodes: Arc<Bitmap>,
253}
254
255impl HummockReadVersion {
256    pub fn new_with_replication_option(
257        table_id: TableId,
258        instance_id: LocalInstanceId,
259        committed_version: CommittedVersion,
260        is_replicated: bool,
261        vnodes: Arc<Bitmap>,
262    ) -> Self {
263        // before build `HummockReadVersion`, we need to get the a initial version which obtained
264        // from meta. want this initialization after version is initialized (now with
265        // notification), so add a assert condition to guarantee correct initialization order
266        assert!(committed_version.is_valid());
267        Self {
268            table_id,
269            instance_id,
270            table_watermarks: {
271                match committed_version.table_watermarks.get(&table_id) {
272                    Some(table_watermarks) => Some(TableWatermarksIndex::new_committed(
273                        table_watermarks.clone(),
274                        committed_version
275                            .state_table_info
276                            .info()
277                            .get(&table_id)
278                            .expect("should exist")
279                            .committed_epoch,
280                        table_watermarks.watermark_type,
281                    )),
282                    None => None,
283                }
284            },
285            staging: StagingVersion {
286                pending_imm_size: 0,
287                pending_imms: Vec::default(),
288                uploading_imms: VecDeque::default(),
289                sst: VecDeque::default(),
290            },
291
292            committed: committed_version,
293
294            is_replicated,
295            vnodes,
296            is_initialized: false,
297        }
298    }
299
300    pub fn new(
301        table_id: TableId,
302        instance_id: LocalInstanceId,
303        committed_version: CommittedVersion,
304        vnodes: Arc<Bitmap>,
305    ) -> Self {
306        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
307    }
308
309    pub fn table_id(&self) -> TableId {
310        self.table_id
311    }
312
313    pub fn init(&mut self) {
314        assert!(!self.is_initialized);
315        self.is_initialized = true;
316    }
317
318    pub fn add_pending_imm(&mut self, imm: ImmutableMemtable, tracker: MemoryTracker) {
319        assert!(self.is_initialized);
320        assert!(!self.is_replicated);
321        if let Some(item) = self
322            .staging
323            .pending_imms
324            .last()
325            .map(|(imm, _)| imm)
326            .or_else(|| self.staging.uploading_imms.front())
327        {
328            // check batch_id order from newest to old
329            assert!(item.batch_id() < imm.batch_id());
330        }
331
332        self.staging.pending_imm_size += imm.size();
333        self.staging.pending_imms.push((imm, tracker));
334    }
335
336    pub fn add_replicated_imm(&mut self, imm: ImmutableMemtable) {
337        assert!(self.is_initialized);
338        assert!(self.is_replicated);
339        assert!(self.staging.pending_imms.is_empty());
340        if let Some(item) = self.staging.uploading_imms.front() {
341            // check batch_id order from newest to old
342            assert!(item.batch_id() < imm.batch_id());
343        }
344        self.staging.uploading_imms.push_front(imm);
345    }
346
347    pub fn pending_imm_size(&self) -> usize {
348        self.staging.pending_imm_size
349    }
350
351    pub fn start_upload_pending_imms(&mut self) -> Vec<(ImmutableMemtable, MemoryTracker)> {
352        assert!(self.is_initialized);
353        assert!(!self.is_replicated);
354        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
355        for (imm, _) in &pending_imms {
356            self.staging.uploading_imms.push_front(imm.clone());
357        }
358        self.staging.pending_imm_size = 0;
359        pending_imms
360    }
361
362    /// Updates the read version with `VersionUpdate`.
363    /// There will be three data types to be processed
364    /// `VersionUpdate::Staging`
365    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
366    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
367    ///       corresponding `staging_imms` according to the `batch_id`
368    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
369    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
370    /// `staging_sst` and `staging_imm` in memory according to epoch
371    pub fn update(&mut self, info: VersionUpdate) {
372        match info {
373            VersionUpdate::Sst(staging_sst_ref) => {
374                {
375                    assert!(!self.is_replicated);
376                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
377                        warn!(
378                            instance_id = self.instance_id,
379                            "no related imm in sst input"
380                        );
381                        return;
382                    };
383
384                    // old data comes first
385                    for imm_id in imms.iter().rev() {
386                        let check_err = match self.staging.uploading_imms.pop_back() {
387                            None => Some("empty".to_owned()),
388                            Some(prev_imm_id) => {
389                                if prev_imm_id.batch_id() == *imm_id {
390                                    None
391                                } else {
392                                    Some(format!(
393                                        "miss match id {} {}",
394                                        prev_imm_id.batch_id(),
395                                        *imm_id
396                                    ))
397                                }
398                            }
399                        };
400                        assert!(
401                            check_err.is_none(),
402                            "should be valid staging_sst.size {},
403                                    staging_sst.imm_ids {:?},
404                                    staging_sst.epochs {:?},
405                                    local_pending_imm_ids {:?},
406                                    local_uploading_imm_ids {:?},
407                                    instance_id {}
408                                    check_err {:?}",
409                            staging_sst_ref.imm_size,
410                            staging_sst_ref.imm_ids,
411                            staging_sst_ref.epochs,
412                            self.staging
413                                .pending_imms
414                                .iter()
415                                .map(|(imm, _)| imm.batch_id())
416                                .collect_vec(),
417                            self.staging
418                                .uploading_imms
419                                .iter()
420                                .map(|imm| imm.batch_id())
421                                .collect_vec(),
422                            self.instance_id,
423                            check_err
424                        );
425                    }
426
427                    self.staging.sst.push_front(staging_sst_ref);
428                }
429            }
430
431            VersionUpdate::CommittedSnapshot(committed_version) => {
432                if let Some(info) = committed_version
433                    .state_table_info
434                    .info()
435                    .get(&self.table_id)
436                {
437                    let committed_epoch = info.committed_epoch;
438                    if self.is_replicated {
439                        self.staging
440                            .uploading_imms
441                            .retain(|imm| imm.min_epoch() > committed_epoch);
442                        self.staging
443                            .pending_imms
444                            .retain(|(imm, _)| imm.min_epoch() > committed_epoch);
445                    } else {
446                        self.staging
447                            .pending_imms
448                            .iter()
449                            .map(|(imm, _)| imm)
450                            .chain(self.staging.uploading_imms.iter())
451                            .for_each(|imm| {
452                                assert!(
453                                    imm.min_epoch() > committed_epoch,
454                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
455                                    imm.table_id,
456                                    imm.min_epoch(),
457                                    committed_epoch
458                                )
459                            });
460                    }
461
462                    self.staging.sst.retain(|sst| {
463                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
464                    });
465
466                    // check epochs.last() > MCE
467                    assert!(self.staging.sst.iter().all(|sst| {
468                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
469                    }));
470
471                    if let Some(committed_watermarks) =
472                        committed_version.table_watermarks.get(&self.table_id)
473                    {
474                        if let Some(watermark_index) = &mut self.table_watermarks {
475                            watermark_index.apply_committed_watermarks(
476                                committed_watermarks.clone(),
477                                committed_epoch,
478                            );
479                        } else {
480                            self.table_watermarks = Some(TableWatermarksIndex::new_committed(
481                                committed_watermarks.clone(),
482                                committed_epoch,
483                                committed_watermarks.watermark_type,
484                            ));
485                        }
486                    }
487                }
488
489                self.committed = committed_version;
490            }
491            VersionUpdate::NewTableWatermark {
492                direction,
493                epoch,
494                vnode_watermarks,
495                watermark_type,
496            } => {
497                if let Some(watermark_index) = &mut self.table_watermarks {
498                    watermark_index.add_epoch_watermark(
499                        epoch,
500                        Arc::from(vnode_watermarks),
501                        direction,
502                    );
503                } else {
504                    self.table_watermarks = Some(TableWatermarksIndex::new(
505                        direction,
506                        epoch,
507                        vnode_watermarks,
508                        self.committed.table_committed_epoch(self.table_id),
509                        watermark_type,
510                    ));
511                }
512            }
513        }
514    }
515
516    pub fn staging(&self) -> &StagingVersion {
517        &self.staging
518    }
519
520    pub fn committed(&self) -> &CommittedVersion {
521        &self.committed
522    }
523
524    /// We have assumption that the watermark is increasing monotonically. Therefore,
525    /// here if the upper layer usage has passed an regressed watermark, we should
526    /// filter out the regressed watermark. Currently the kv log store may write
527    /// regressed watermark
528    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
529        if let Some(watermark_index) = &self.table_watermarks {
530            watermark_index.filter_regress_watermarks(watermarks)
531        }
532    }
533
534    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
535        self.table_watermarks
536            .as_ref()
537            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
538    }
539
540    pub fn is_initialized(&self) -> bool {
541        self.is_initialized
542    }
543
544    pub fn is_replicated(&self) -> bool {
545        self.is_replicated
546    }
547
548    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
549        std::mem::replace(&mut self.vnodes, vnodes)
550    }
551
552    pub fn contains(&self, vnode: VirtualNode) -> bool {
553        self.vnodes.is_set(vnode.to_index())
554    }
555
556    pub fn vnodes(&self) -> Arc<Bitmap> {
557        self.vnodes.clone()
558    }
559}
560
561pub fn read_filter_for_version(
562    epoch: HummockEpoch,
563    table_id: TableId,
564    mut table_key_range: TableKeyRange,
565    read_version: &RwLock<HummockReadVersion>,
566) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
567    let read_version_guard = read_version.read();
568
569    let committed_version = read_version_guard.committed().clone();
570
571    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
572        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
573    }
574
575    let (imm_iter, sst_iter) =
576        read_version_guard
577            .staging()
578            .prune_overlap(epoch, table_id, &table_key_range);
579
580    let imms = imm_iter.cloned().collect();
581    let ssts = sst_iter.cloned().collect();
582
583    Ok((table_key_range, (imms, ssts, committed_version)))
584}
585
586#[derive(Clone)]
587pub struct HummockVersionReader {
588    sstable_store: SstableStoreRef,
589
590    /// Statistics
591    state_store_metrics: Arc<HummockStateStoreMetrics>,
592    preload_retry_times: usize,
593}
594
595/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
596/// `streaming_query`
597impl HummockVersionReader {
598    pub fn new(
599        sstable_store: SstableStoreRef,
600        state_store_metrics: Arc<HummockStateStoreMetrics>,
601        preload_retry_times: usize,
602    ) -> Self {
603        Self {
604            sstable_store,
605            state_store_metrics,
606            preload_retry_times,
607        }
608    }
609
610    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
611        &self.state_store_metrics
612    }
613}
614
615const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
616
617impl HummockVersionReader {
618    fn skip_get_by_vnode_user_key_range(
619        sstable_info: &SstableInfo,
620        vnode: VirtualNode,
621        user_key: UserKey<&[u8]>,
622        local_stats: &mut StoreLocalStatistic,
623    ) -> bool {
624        if let Some(vnode_statistics) = &sstable_info.vnode_statistics {
625            // Only skip if vnode-statistics exists and key is out of range.
626            // If vnode-statistics not found, it may be due to incomplete stats (reached limit).
627            if let Some((vnode_min, vnode_max)) = vnode_statistics.get_vnode_user_key_range(vnode) {
628                local_stats.vnode_checked_get_count += 1;
629                if user_key < vnode_min.as_ref() || user_key > vnode_max.as_ref() {
630                    local_stats.vnode_pruned_get_count += 1;
631                    return true;
632                }
633            }
634        }
635        false
636    }
637
638    pub async fn get<'a, O>(
639        &'a self,
640        table_key: TableKey<Bytes>,
641        epoch: u64,
642        table_id: TableId,
643        table_option: TableOption,
644        read_options: ReadOptions,
645        read_version_tuple: ReadVersionTuple,
646        on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
647    ) -> StorageResult<Option<O>> {
648        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
649
650        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
651        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
652        let local_stats = &mut stats_guard.local_stats;
653        local_stats.found_key = true;
654
655        // 1. read staging data
656        for imm in &imms {
657            // skip imm that only holding out-of-date data
658            if imm.max_epoch() < min_epoch {
659                continue;
660            }
661
662            local_stats.staging_imm_get_count += 1;
663
664            if let Some((data, data_epoch)) = get_from_batch(
665                imm,
666                TableKey(table_key.as_ref()),
667                epoch,
668                &read_options,
669                local_stats,
670            ) {
671                return Ok(if data_epoch.pure_epoch() < min_epoch {
672                    None
673                } else {
674                    data.into_user_value()
675                        .map(|v| {
676                            on_key_value_fn(
677                                FullKey::new_with_gap_epoch(
678                                    table_id,
679                                    table_key.to_ref(),
680                                    data_epoch,
681                                ),
682                                v.as_ref(),
683                            )
684                        })
685                        .transpose()?
686                });
687            }
688        }
689
690        // 2. order guarantee: imm -> sst
691        let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
692            Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
693        });
694
695        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
696        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
697        let full_key = FullKey::new_with_gap_epoch(
698            table_id,
699            TableKey(table_key.clone()),
700            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
701        );
702        let single_table_key_range = table_key.clone()..=table_key.clone();
703
704        // prune uncommitted ssts with the keyrange
705        let pruned_uncommitted_ssts =
706            prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
707        for local_sst in pruned_uncommitted_ssts {
708            local_stats.staging_sst_get_count += 1;
709            if let Some(iter) = get_from_sstable_info(
710                self.sstable_store.clone(),
711                local_sst,
712                full_key.to_ref(),
713                &read_options,
714                dist_key_hash,
715                local_stats,
716            )
717            .await?
718            {
719                debug_assert!(iter.is_valid());
720                let data_epoch = iter.key().epoch_with_gap;
721                return Ok(if data_epoch.pure_epoch() < min_epoch {
722                    None
723                } else {
724                    iter.value()
725                        .into_user_value()
726                        .map(|v| {
727                            on_key_value_fn(
728                                FullKey::new_with_gap_epoch(
729                                    table_id,
730                                    table_key.to_ref(),
731                                    data_epoch,
732                                ),
733                                v,
734                            )
735                        })
736                        .transpose()?
737                });
738            }
739        }
740        // 3. read from committed_version sst file
741        // Because SST meta records encoded key range,
742        // the filter key needs to be encoded as well.
743        assert!(committed_version.is_valid());
744        for level in committed_version.levels(table_id) {
745            if level.table_infos.is_empty() {
746                continue;
747            }
748
749            match level.level_type {
750                LevelType::Overlapping | LevelType::Unspecified => {
751                    let sstable_infos = prune_overlapping_ssts(
752                        &level.table_infos,
753                        table_id,
754                        &single_table_key_range,
755                    );
756                    for sstable_info in sstable_infos {
757                        // filter vnode-key range that is definitely not containing the key
758                        if Self::skip_get_by_vnode_user_key_range(
759                            sstable_info,
760                            VirtualNode::from_index(full_key.user_key.get_vnode_id()),
761                            full_key.user_key.as_ref(),
762                            local_stats,
763                        ) {
764                            continue;
765                        }
766
767                        local_stats.overlapping_get_count += 1;
768                        if let Some(iter) = get_from_sstable_info(
769                            self.sstable_store.clone(),
770                            sstable_info,
771                            full_key.to_ref(),
772                            &read_options,
773                            dist_key_hash,
774                            local_stats,
775                        )
776                        .await?
777                        {
778                            debug_assert!(iter.is_valid());
779                            let data_epoch = iter.key().epoch_with_gap;
780                            return Ok(if data_epoch.pure_epoch() < min_epoch {
781                                None
782                            } else {
783                                iter.value()
784                                    .into_user_value()
785                                    .map(|v| {
786                                        on_key_value_fn(
787                                            FullKey::new_with_gap_epoch(
788                                                table_id,
789                                                table_key.to_ref(),
790                                                data_epoch,
791                                            ),
792                                            v,
793                                        )
794                                    })
795                                    .transpose()?
796                            });
797                        }
798                    }
799                }
800                LevelType::Nonoverlapping => {
801                    let mut table_info_idx =
802                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
803                    if table_info_idx == 0 {
804                        continue;
805                    }
806                    table_info_idx = table_info_idx.saturating_sub(1);
807                    let sstable_info = &level.table_infos[table_info_idx];
808
809                    if sstable_info.table_ids.binary_search(&table_id).is_err() {
810                        continue;
811                    }
812
813                    // Filter SSTs that definitely cannot contain the key.
814                    let ord = sstable_info
815                        .key_range
816                        .compare_right_with_user_key(full_key.user_key.as_ref());
817                    // the case that the key falls into the gap between two ssts
818                    if ord == Ordering::Less {
819                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
820                        continue;
821                    }
822
823                    // Filter vnode-key range that is definitely not containing the key.
824                    if Self::skip_get_by_vnode_user_key_range(
825                        sstable_info,
826                        VirtualNode::from_index(full_key.user_key.get_vnode_id()),
827                        full_key.user_key.as_ref(),
828                        local_stats,
829                    ) {
830                        continue;
831                    }
832
833                    local_stats.non_overlapping_get_count += 1;
834                    if let Some(iter) = get_from_sstable_info(
835                        self.sstable_store.clone(),
836                        sstable_info,
837                        full_key.to_ref(),
838                        &read_options,
839                        dist_key_hash,
840                        local_stats,
841                    )
842                    .await?
843                    {
844                        debug_assert!(iter.is_valid());
845                        let data_epoch = iter.key().epoch_with_gap;
846                        return Ok(if data_epoch.pure_epoch() < min_epoch {
847                            None
848                        } else {
849                            iter.value()
850                                .into_user_value()
851                                .map(|v| {
852                                    on_key_value_fn(
853                                        FullKey::new_with_gap_epoch(
854                                            table_id,
855                                            table_key.to_ref(),
856                                            data_epoch,
857                                        ),
858                                        v,
859                                    )
860                                })
861                                .transpose()?
862                        });
863                    }
864                }
865            }
866        }
867        stats_guard.local_stats.found_key = false;
868        Ok(None)
869    }
870
871    pub async fn iter(
872        &self,
873        table_key_range: TableKeyRange,
874        epoch: u64,
875        table_id: TableId,
876        table_option: TableOption,
877        read_options: ReadOptions,
878        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
879    ) -> StorageResult<HummockStorageIterator> {
880        self.iter_with_memtable(
881            table_key_range,
882            epoch,
883            table_id,
884            table_option,
885            read_options,
886            read_version_tuple,
887            None,
888        )
889        .await
890    }
891
892    pub async fn iter_with_memtable<'b>(
893        &self,
894        table_key_range: TableKeyRange,
895        epoch: u64,
896        table_id: TableId,
897        table_option: TableOption,
898        read_options: ReadOptions,
899        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
900        memtable_iter: Option<MemTableHummockIterator<'b>>,
901    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
902        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
903        let user_key_range = (
904            user_key_range_ref.0.map(|key| key.cloned()),
905            user_key_range_ref.1.map(|key| key.cloned()),
906        );
907        let mut factory = ForwardIteratorFactory::default();
908        let mut local_stats = StoreLocalStatistic::default();
909        let (imms, uncommitted_ssts, committed) = read_version_tuple;
910        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
911        self.iter_inner(
912            table_key_range,
913            epoch,
914            table_id,
915            read_options,
916            imms,
917            uncommitted_ssts,
918            &committed,
919            &mut local_stats,
920            &mut factory,
921        )
922        .await?;
923        let merge_iter = factory.build(memtable_iter);
924        // the epoch_range left bound for iterator read
925        let mut user_iter = UserIterator::new(
926            merge_iter,
927            user_key_range,
928            epoch,
929            min_epoch,
930            Some(committed),
931        );
932        user_iter.rewind().await?;
933        Ok(HummockStorageIteratorInner::new(
934            user_iter,
935            self.state_store_metrics.clone(),
936            table_id,
937            local_stats,
938        ))
939    }
940
941    pub async fn rev_iter<'b>(
942        &self,
943        table_key_range: TableKeyRange,
944        epoch: u64,
945        table_id: TableId,
946        table_option: TableOption,
947        read_options: ReadOptions,
948        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
949        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
950    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
951        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
952        let user_key_range = (
953            user_key_range_ref.0.map(|key| key.cloned()),
954            user_key_range_ref.1.map(|key| key.cloned()),
955        );
956        let mut factory = BackwardIteratorFactory::default();
957        let mut local_stats = StoreLocalStatistic::default();
958        let (imms, uncommitted_ssts, committed) = read_version_tuple;
959        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
960        self.iter_inner(
961            table_key_range,
962            epoch,
963            table_id,
964            read_options,
965            imms,
966            uncommitted_ssts,
967            &committed,
968            &mut local_stats,
969            &mut factory,
970        )
971        .await?;
972        let merge_iter = factory.build(memtable_iter);
973        // the epoch_range left bound for iterator read
974        let mut user_iter = BackwardUserIterator::new(
975            merge_iter,
976            user_key_range,
977            epoch,
978            min_epoch,
979            Some(committed),
980        );
981        user_iter.rewind().await?;
982        Ok(HummockStorageRevIteratorInner::new(
983            user_iter,
984            self.state_store_metrics.clone(),
985            table_id,
986            local_stats,
987        ))
988    }
989
990    async fn iter_inner<F: IteratorFactory>(
991        &self,
992        table_key_range: TableKeyRange,
993        epoch: u64,
994        table_id: TableId,
995        read_options: ReadOptions,
996        imms: Vec<ImmutableMemtable>,
997        uncommitted_ssts: Vec<SstableInfo>,
998        committed: &CommittedVersion,
999        local_stats: &mut StoreLocalStatistic,
1000        factory: &mut F,
1001    ) -> StorageResult<()> {
1002        {
1003            fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
1004                match bound {
1005                    Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
1006                    Bound::Unbounded => None,
1007                }
1008            }
1009            let (left, right) = &table_key_range;
1010            if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
1011                && right < left
1012            {
1013                if cfg!(debug_assertions) {
1014                    panic!("invalid iter key range: {table_id} {left:?} {right:?}")
1015                } else {
1016                    return Err(HummockError::other(format!(
1017                        "invalid iter key range: {table_id} {left:?} {right:?}"
1018                    ))
1019                    .into());
1020                }
1021            }
1022        }
1023
1024        local_stats.staging_imm_iter_count = imms.len() as u64;
1025        for imm in imms {
1026            factory.add_batch_iter(imm);
1027        }
1028
1029        // 2. build iterator from committed
1030        // Because SST meta records encoded key range,
1031        // the filter key range needs to be encoded as well.
1032        let user_key_range = bound_table_key_range(table_id, &table_key_range);
1033        let user_key_range_ref = (
1034            user_key_range.0.as_ref().map(UserKey::as_ref),
1035            user_key_range.1.as_ref().map(UserKey::as_ref),
1036        );
1037        let mut staging_sst_iter_count = 0;
1038        // encode once
1039        let bloom_filter_prefix_hash = read_options
1040            .prefix_hint
1041            .as_ref()
1042            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
1043        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
1044        if read_options.prefetch_options.prefetch {
1045            sst_read_options.must_iterated_end_user_key =
1046                Some(user_key_range.1.map(|key| key.cloned()));
1047            sst_read_options.max_preload_retry_times = self.preload_retry_times;
1048        }
1049        let sst_read_options = Arc::new(sst_read_options);
1050        for sstable_info in &uncommitted_ssts {
1051            let table_holder = self
1052                .sstable_store
1053                .sstable(sstable_info, local_stats)
1054                .await?;
1055
1056            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
1057                && !hit_sstable_bloom_filter(
1058                    &table_holder,
1059                    &user_key_range_ref,
1060                    *prefix_hash,
1061                    local_stats,
1062                )
1063            {
1064                continue;
1065            }
1066
1067            staging_sst_iter_count += 1;
1068            factory.add_staging_sst_iter(F::SstableIteratorType::create(
1069                table_holder,
1070                self.sstable_store.clone(),
1071                sst_read_options.clone(),
1072                sstable_info,
1073            ));
1074        }
1075        local_stats.staging_sst_iter_count = staging_sst_iter_count;
1076
1077        let timer = Instant::now();
1078
1079        for level in committed.levels(table_id) {
1080            if level.table_infos.is_empty() {
1081                continue;
1082            }
1083
1084            if level.level_type == LevelType::Nonoverlapping {
1085                let mut table_infos =
1086                    prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1087                        .peekable();
1088
1089                if table_infos.peek().is_none() {
1090                    continue;
1091                }
1092                let sstable_infos = table_infos.cloned().collect_vec();
1093                if sstable_infos.len() > 1 {
1094                    factory.add_concat_sst_iter(
1095                        sstable_infos,
1096                        self.sstable_store.clone(),
1097                        sst_read_options.clone(),
1098                    );
1099                    local_stats.non_overlapping_iter_count += 1;
1100                } else {
1101                    let sstable_info = &sstable_infos[0];
1102
1103                    let sstable = self
1104                        .sstable_store
1105                        .sstable(sstable_info, local_stats)
1106                        .await?;
1107
1108                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1109                        && !hit_sstable_bloom_filter(
1110                            &sstable,
1111                            &user_key_range_ref,
1112                            *dist_hash,
1113                            local_stats,
1114                        )
1115                    {
1116                        continue;
1117                    }
1118                    // Since there is only one sst to be included for the current non-overlapping
1119                    // level, there is no need to create a ConcatIterator on it.
1120                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1121                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1122                    // it in `non_overlapping_iter_count`.
1123                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1124                        sstable,
1125                        self.sstable_store.clone(),
1126                        sst_read_options.clone(),
1127                        sstable_info,
1128                    ));
1129                    local_stats.non_overlapping_iter_count += 1;
1130                }
1131            } else {
1132                let table_infos =
1133                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1134                // Overlapping
1135                let fetch_meta_req = table_infos.rev().collect_vec();
1136                if fetch_meta_req.is_empty() {
1137                    continue;
1138                }
1139                for sstable_info in fetch_meta_req {
1140                    let sstable = self
1141                        .sstable_store
1142                        .sstable(sstable_info, local_stats)
1143                        .await?;
1144                    assert_eq!(sstable_info.object_id, sstable.id);
1145                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1146                        && !hit_sstable_bloom_filter(
1147                            &sstable,
1148                            &user_key_range_ref,
1149                            *dist_hash,
1150                            local_stats,
1151                        )
1152                    {
1153                        continue;
1154                    }
1155                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1156                        sstable,
1157                        self.sstable_store.clone(),
1158                        sst_read_options.clone(),
1159                        sstable_info,
1160                    ));
1161                    local_stats.overlapping_iter_count += 1;
1162                }
1163            }
1164        }
1165        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1166        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1167            let table_id_string = table_id.to_string();
1168            tracing::warn!(
1169                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1170                table_id_string,
1171                epoch,
1172                fetch_meta_duration_sec,
1173                local_stats.cache_meta_block_miss
1174            );
1175            self.state_store_metrics
1176                .iter_slow_fetch_meta_cache_unhits
1177                .set(local_stats.cache_meta_block_miss as i64);
1178        }
1179        Ok(())
1180    }
1181
1182    pub async fn iter_log(
1183        &self,
1184        epoch_range: (u64, u64),
1185        key_range: TableKeyRange,
1186        options: ReadLogOptions,
1187        table_change_log_manager: Arc<TableChangeLogManager>,
1188    ) -> HummockResult<ChangeLogIterator> {
1189        // The end value of `epoch_range` is not greater than max committed epoch, guaranteed by the caller `BatchTableInnerIterLogInner`.
1190        let change_log: Vec<_> = {
1191            let table_change_logs = table_change_log_manager
1192                .fetch_table_change_logs(options.table_id, epoch_range, false, None)
1193                .await?;
1194            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1195                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1196            } else {
1197                Vec::new()
1198            }
1199        };
1200
1201        if let Some(max_epoch_change_log) = change_log.last() {
1202            let (_, max_epoch) = epoch_range;
1203            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1204                warn!(
1205                    max_epoch,
1206                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1207                    table_id = %options.table_id,
1208                    "max_epoch does not exist"
1209                );
1210            }
1211        }
1212        let read_options = Arc::new(SstableIteratorReadOptions {
1213            cache_policy: Default::default(),
1214            must_iterated_end_user_key: None,
1215            max_preload_retry_times: 0,
1216            prefetch_for_large_query: false,
1217        });
1218
1219        async fn make_iter(
1220            sstable_infos: impl Iterator<Item = &SstableInfo>,
1221            sstable_store: &SstableStoreRef,
1222            read_options: Arc<SstableIteratorReadOptions>,
1223            local_stat: &mut StoreLocalStatistic,
1224        ) -> HummockResult<MergeIterator<SstableIterator>> {
1225            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1226                let sstable_store = sstable_store.clone();
1227                let read_options = read_options.clone();
1228                async move {
1229                    let mut local_stat = StoreLocalStatistic::default();
1230                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1231                    Ok::<_, HummockError>((
1232                        SstableIterator::new(
1233                            table_holder,
1234                            sstable_store,
1235                            read_options,
1236                            sstable_info,
1237                        ),
1238                        local_stat,
1239                    ))
1240                }
1241            }))
1242            .await?;
1243            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1244                |(iter, stats)| {
1245                    local_stat.add(&stats);
1246                    iter
1247                },
1248            )))
1249        }
1250
1251        let mut local_stat = StoreLocalStatistic::default();
1252
1253        let new_value_iter = make_iter(
1254            change_log
1255                .iter()
1256                .flat_map(|log| log.new_value.iter())
1257                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1258            &self.sstable_store,
1259            read_options.clone(),
1260            &mut local_stat,
1261        )
1262        .await?;
1263        let old_value_iter = make_iter(
1264            change_log
1265                .iter()
1266                .flat_map(|log| log.old_value.iter())
1267                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1268            &self.sstable_store,
1269            read_options.clone(),
1270            &mut local_stat,
1271        )
1272        .await?;
1273        ChangeLogIterator::new(
1274            epoch_range,
1275            key_range,
1276            new_value_iter,
1277            old_value_iter,
1278            options.table_id,
1279            IterLocalMetricsGuard::new(
1280                self.state_store_metrics.clone(),
1281                options.table_id,
1282                local_stat,
1283            ),
1284        )
1285        .await
1286    }
1287
1288    pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1289        &'a self,
1290        version: PinnedVersion,
1291        table_id: TableId,
1292        target: VectorRef<'a>,
1293        options: VectorNearestOptions,
1294        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1295    ) -> HummockResult<Vec<O>> {
1296        let Some(index) = version.vector_indexes.get(&table_id) else {
1297            return Ok(vec![]);
1298        };
1299        if target.dimension() != index.dimension {
1300            return Err(HummockError::other(format!(
1301                "target dimension {} not match index dimension {}",
1302                target.dimension(),
1303                index.dimension
1304            )));
1305        }
1306        match &index.inner {
1307            VectorIndexImpl::Flat(flat) => {
1308                let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1309                let mut cache_stat = VectorStoreCacheStats::default();
1310                for vector_file in &flat.vector_store_info.vector_files {
1311                    let meta = self
1312                        .sstable_store
1313                        .get_vector_file_meta(vector_file, &mut cache_stat)
1314                        .await?;
1315                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1316                        let block = self
1317                            .sstable_store
1318                            .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1319                            .await?;
1320                        builder.add(&**block, &on_nearest_item_fn);
1321                    }
1322                }
1323                cache_stat.report(table_id, "flat", self.stats());
1324                Ok(builder.finish())
1325            }
1326            VectorIndexImpl::HnswFlat(hnsw_flat) => {
1327                let Some(graph_file) = &hnsw_flat.graph_file else {
1328                    return Ok(vec![]);
1329                };
1330
1331                let mut ctx = FileVectorStoreCtx::default();
1332
1333                let graph = self
1334                    .sstable_store
1335                    .get_hnsw_graph(graph_file, &mut ctx.stats)
1336                    .await?;
1337
1338                let vector_store =
1339                    FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1340                let (items, stats) = nearest::<O, M, _>(
1341                    &vector_store,
1342                    &mut ctx,
1343                    &*graph,
1344                    target,
1345                    on_nearest_item_fn,
1346                    options.hnsw_ef_search,
1347                    options.top_n,
1348                )
1349                .await?;
1350                ctx.stats.report(table_id, "hnsw_read", self.stats());
1351                report_hnsw_stat(
1352                    self.stats(),
1353                    table_id,
1354                    "hnsw_read",
1355                    options.top_n,
1356                    options.hnsw_ef_search,
1357                    [stats],
1358                );
1359                Ok(items)
1360            }
1361        }
1362    }
1363}
1364
1365#[cfg(test)]
1366mod tests {
1367    use std::collections::{BTreeMap, HashMap, HashSet};
1368    use std::sync::Arc;
1369
1370    use bytes::Bytes;
1371    use prometheus::Registry;
1372    use risingwave_common::catalog::{TableId, TableOption};
1373    use risingwave_common::config::MetricLevel;
1374    use risingwave_common::hash::VirtualNode;
1375    use risingwave_common::util::epoch::test_epoch;
1376    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
1377    use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, gen_key_from_bytes};
1378    use risingwave_hummock_sdk::key_range::KeyRange;
1379    use risingwave_hummock_sdk::level::{Level, Levels, OverlappingLevel};
1380    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner, VnodeStatistics};
1381    use risingwave_hummock_sdk::version::HummockVersion;
1382    use risingwave_hummock_sdk::{EpochWithGap, HummockSstableObjectId};
1383    use risingwave_pb::hummock::hummock_version::PbLevels;
1384    use risingwave_pb::hummock::{
1385        LevelType as PbLevelType, PbBloomFilterType, PbHummockVersion, PbLevel, PbOverlappingLevel,
1386        PbStateTableInfo, StateTableInfoDelta,
1387    };
1388    use tokio::sync::mpsc::unbounded_channel;
1389
1390    use crate::hummock::HummockValue;
1391    use crate::hummock::iterator::test_utils::mock_sstable_store;
1392    use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
1393    use crate::hummock::store::version::{CommittedVersion, HummockVersionReader};
1394    use crate::hummock::test_utils::{
1395        default_builder_opt_for_test, gen_test_sstable_with_table_ids,
1396    };
1397    use crate::monitor::{HummockStateStoreMetrics, flush_local_metrics_for_test};
1398    use crate::store::ReadOptions;
1399
1400    /// In a nonoverlapping level, `search_sst_idx` may locate an SST whose key range covers
1401    /// the query user key but whose `table_ids` does not contain the queried table. The get
1402    /// path must skip such SSTs instead of reading them.
1403    #[tokio::test]
1404    async fn test_get_skips_sst_by_table_id_filter() {
1405        let query_table_id = TableId::new(100);
1406        let epoch: u64 = (31 * 1000) << 16;
1407        let compaction_group_id = StaticCompactionGroupId::StateDefault;
1408
1409        // SST key range: table_id 50..150, but table_ids = [50, 150] (no 100).
1410        let sst_info = SstableInfoInner {
1411            sst_id: 1.into(),
1412            object_id: 1.into(),
1413            key_range: KeyRange {
1414                left: Bytes::from(
1415                    FullKey::for_test(TableId::new(50), b"aaa".to_vec(), epoch).encode(),
1416                ),
1417                right: Bytes::from(
1418                    FullKey::for_test(TableId::new(150), b"zzz".to_vec(), epoch).encode(),
1419                ),
1420                right_exclusive: false,
1421            },
1422            table_ids: vec![TableId::new(50), TableId::new(150)],
1423            file_size: 1024,
1424            ..Default::default()
1425        }
1426        .into();
1427
1428        let level = Level {
1429            level_idx: 1,
1430            level_type: PbLevelType::Nonoverlapping,
1431            table_infos: vec![sst_info],
1432            total_file_size: 0,
1433            sub_level_id: 0,
1434            uncompressed_file_size: 0,
1435            vnode_partition_count: 0,
1436        };
1437
1438        #[allow(deprecated)]
1439        let levels = Levels {
1440            levels: vec![level],
1441            l0: OverlappingLevel::default(),
1442            group_id: compaction_group_id,
1443            parent_group_id: compaction_group_id,
1444            member_table_ids: vec![],
1445            compaction_group_version_id: 0,
1446        };
1447
1448        let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
1449            id: 1u64.into(),
1450            ..Default::default()
1451        });
1452        version.levels.insert(compaction_group_id, levels);
1453        version.state_table_info.apply_delta(
1454            &HashMap::from([(
1455                query_table_id,
1456                StateTableInfoDelta {
1457                    committed_epoch: epoch,
1458                    compaction_group_id,
1459                },
1460            )]),
1461            &HashSet::new(),
1462        );
1463
1464        let pinned_version = PinnedVersion::new(version, unbounded_channel().0);
1465        let reader = HummockVersionReader::new(
1466            mock_sstable_store().await,
1467            Arc::new(HummockStateStoreMetrics::unused()),
1468            0,
1469        );
1470
1471        let result = reader
1472            .get(
1473                TableKey(Bytes::from("test_key")),
1474                epoch,
1475                query_table_id,
1476                TableOption::default(),
1477                ReadOptions::default(),
1478                (vec![], vec![], pinned_version),
1479                |_key, _value| Ok(()),
1480            )
1481            .await
1482            .unwrap();
1483
1484        assert!(result.is_none());
1485    }
1486
1487    /// Build a committed version containing a single SST with custom vnode stats.
1488    #[allow(deprecated)]
1489    fn build_version_with_vnode_stats(
1490        table_id: TableId,
1491        vnode_stats: VnodeStatistics,
1492        key_range: (Vec<u8>, Vec<u8>),
1493        level_type: PbLevelType,
1494    ) -> (SstableInfo, CommittedVersion) {
1495        let object_id = HummockSstableObjectId::new(1);
1496        let left_full_key = FullKey::new_with_gap_epoch(
1497            table_id,
1498            TableKey(Bytes::from(key_range.0)),
1499            EpochWithGap::new_from_epoch(test_epoch(0)),
1500        )
1501        .encode();
1502        let right_full_key = FullKey::new_with_gap_epoch(
1503            table_id,
1504            TableKey(Bytes::from(key_range.1)),
1505            EpochWithGap::new_from_epoch(test_epoch(0)),
1506        )
1507        .encode();
1508
1509        let sstable_info: SstableInfo = SstableInfoInner {
1510            object_id,
1511            sst_id: object_id.as_raw_id().into(),
1512            key_range: KeyRange {
1513                left: Bytes::from(left_full_key),
1514                right: Bytes::from(right_full_key),
1515                right_exclusive: false,
1516            },
1517            file_size: 1,
1518            table_ids: vec![table_id],
1519            meta_offset: 0,
1520            stale_key_count: 0,
1521            total_key_count: 0,
1522            min_epoch: 0,
1523            max_epoch: 0,
1524            uncompressed_file_size: 0,
1525            range_tombstone_count: 0,
1526            bloom_filter_kind: PbBloomFilterType::Sstable,
1527            sst_size: 1,
1528            vnode_statistics: Some(vnode_stats),
1529        }
1530        .into();
1531        let pb_level = PbLevel {
1532            level_idx: if level_type == PbLevelType::Overlapping {
1533                0
1534            } else {
1535                1
1536            },
1537            level_type: level_type as i32,
1538            table_infos: vec![sstable_info.clone().into()],
1539            total_file_size: 1,
1540            sub_level_id: 0,
1541            uncompressed_file_size: 1,
1542            vnode_partition_count: 0,
1543        };
1544
1545        let (levels, l0) = if level_type == PbLevelType::Overlapping {
1546            (
1547                vec![],
1548                Some(PbOverlappingLevel {
1549                    sub_levels: vec![pb_level],
1550                    total_file_size: 1,
1551                    uncompressed_file_size: 1,
1552                }),
1553            )
1554        } else {
1555            (vec![pb_level], Some(PbOverlappingLevel::default()))
1556        };
1557
1558        let pb_levels = PbLevels {
1559            levels,
1560            l0,
1561            group_id: StaticCompactionGroupId::NewCompactionGroup,
1562            parent_group_id: 0.into(),
1563            member_table_ids: vec![],
1564            compaction_group_version_id: 0,
1565        };
1566
1567        let pb_version = PbHummockVersion {
1568            id: 1.into(),
1569            levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1570            max_committed_epoch: 0,
1571            table_watermarks: HashMap::new(),
1572            table_change_logs: HashMap::new(),
1573            state_table_info: HashMap::from_iter([(
1574                table_id,
1575                PbStateTableInfo {
1576                    committed_epoch: 0,
1577                    compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1578                },
1579            )]),
1580            vector_indexes: HashMap::new(),
1581        };
1582
1583        let version = HummockVersion::from(&pb_version);
1584        let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1585        let pinned = PinnedVersion::new(version, tx);
1586        (sstable_info, pinned)
1587    }
1588
1589    /// Build a committed version from an existing SST (with real object in the store).
1590    #[allow(deprecated)]
1591    fn build_version_from_sstables(
1592        table_id: TableId,
1593        sstable_infos: Vec<SstableInfo>,
1594        level_type: PbLevelType,
1595    ) -> CommittedVersion {
1596        let total_file_size = sstable_infos.iter().map(|sst| sst.file_size).sum::<u64>();
1597        let uncompressed_file_size = sstable_infos
1598            .iter()
1599            .map(|sst| sst.uncompressed_file_size)
1600            .sum::<u64>();
1601        let pb_level = PbLevel {
1602            level_idx: if level_type == PbLevelType::Overlapping {
1603                0
1604            } else {
1605                1
1606            },
1607            level_type: level_type as i32,
1608            table_infos: sstable_infos.into_iter().map(Into::into).collect(),
1609            total_file_size,
1610            sub_level_id: 0,
1611            uncompressed_file_size,
1612            vnode_partition_count: 0,
1613        };
1614
1615        let (levels, l0) = if level_type == PbLevelType::Overlapping {
1616            (
1617                vec![],
1618                Some(PbOverlappingLevel {
1619                    sub_levels: vec![pb_level],
1620                    total_file_size,
1621                    uncompressed_file_size,
1622                }),
1623            )
1624        } else {
1625            (vec![pb_level], Some(PbOverlappingLevel::default()))
1626        };
1627
1628        let pb_levels = PbLevels {
1629            levels,
1630            l0,
1631            group_id: StaticCompactionGroupId::NewCompactionGroup,
1632            parent_group_id: 0.into(),
1633            member_table_ids: vec![],
1634            compaction_group_version_id: 0,
1635        };
1636
1637        let pb_version = PbHummockVersion {
1638            id: 1.into(),
1639            levels: HashMap::from_iter([(StaticCompactionGroupId::NewCompactionGroup, pb_levels)]),
1640            max_committed_epoch: 0,
1641            table_watermarks: HashMap::new(),
1642            table_change_logs: HashMap::new(),
1643            state_table_info: HashMap::from_iter([(
1644                table_id,
1645                PbStateTableInfo {
1646                    committed_epoch: 0,
1647                    compaction_group_id: StaticCompactionGroupId::NewCompactionGroup,
1648                },
1649            )]),
1650            vector_indexes: HashMap::new(),
1651        };
1652
1653        let version = HummockVersion::from(&pb_version);
1654        let (tx, _rx) = unbounded_channel::<PinVersionAction>();
1655        PinnedVersion::new(version, tx)
1656    }
1657
1658    /// Build a committed version from one existing non-overlapping SST.
1659    #[allow(deprecated)]
1660    fn build_version_from_sstable(
1661        table_id: TableId,
1662        sstable_info: SstableInfo,
1663    ) -> CommittedVersion {
1664        build_version_from_sstables(table_id, vec![sstable_info], PbLevelType::Nonoverlapping)
1665    }
1666
1667    fn vnode_prune_counts(
1668        metrics: &HummockStateStoreMetrics,
1669        table_id: TableId,
1670        operation: &str,
1671    ) -> (u64, u64) {
1672        let table_label = table_id.to_string();
1673        let checked = metrics
1674            .vnode_pruning_counts
1675            .with_guarded_label_values(&[
1676                table_label.clone(),
1677                operation.to_owned(),
1678                "checked".to_owned(),
1679            ])
1680            .get();
1681        let pruned = metrics
1682            .vnode_pruning_counts
1683            .with_guarded_label_values(&[table_label, operation.to_owned(), "pruned".to_owned()])
1684            .get();
1685        (checked, pruned)
1686    }
1687
1688    async fn assert_vnode_prune_get_skips_out_of_range_key(
1689        table_id: TableId,
1690        epoch: u64,
1691        level_type: PbLevelType,
1692    ) {
1693        let sstable_store = mock_sstable_store().await;
1694        let registry = Registry::new();
1695        let metrics = Arc::new(HummockStateStoreMetrics::new(&registry, MetricLevel::Debug));
1696        let reader = HummockVersionReader::new(sstable_store, metrics.clone(), 0);
1697        let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1698
1699        let make_user_key = |vnode: VirtualNode, suffix: &str| {
1700            let mut raw = vnode.to_be_bytes().to_vec();
1701            raw.extend_from_slice(suffix.as_bytes());
1702            UserKey::new(table_id, TableKey(raw.into()))
1703        };
1704
1705        // Stats cover vnode 1 only up to "bb".
1706        let vnode_stats = VnodeStatistics::from_map(BTreeMap::from_iter([(
1707            VirtualNode::from_index(1),
1708            (
1709                make_user_key(VirtualNode::from_index(1), "aa"),
1710                make_user_key(VirtualNode::from_index(1), "bb"),
1711            ),
1712        )]));
1713
1714        // SST key range is wide enough to include the queried key, but vnode stats should prune it.
1715        let key_range = {
1716            let mut left = VirtualNode::from_index(0).to_be_bytes().to_vec();
1717            left.extend_from_slice(b"aa");
1718            let mut right = VirtualNode::from_index(1).to_be_bytes().to_vec();
1719            right.extend_from_slice(b"zzzz");
1720            (left, right)
1721        };
1722
1723        let (_sst, committed) =
1724            build_version_with_vnode_stats(table_id, vnode_stats, key_range, level_type);
1725
1726        // Query vnode 1 but with suffix beyond the recorded max -> should be pruned.
1727        let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1728        raw.extend_from_slice(b"zz");
1729        let table_key = TableKey(Bytes::from(raw.clone()));
1730
1731        let result = reader
1732            .get(
1733                table_key,
1734                epoch,
1735                table_id,
1736                TableOption::default(),
1737                ReadOptions::default(),
1738                (vec![], vec![], committed),
1739                |_k, v| Ok(Bytes::copy_from_slice(v)),
1740            )
1741            .await
1742            .unwrap();
1743        flush_local_metrics_for_test();
1744
1745        assert!(
1746            result.is_none(),
1747            "vnode pruning should skip SST without reading data"
1748        );
1749        let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1750        assert_eq!(checked_before + 1, checked_after);
1751        assert_eq!(pruned_before + 1, pruned_after);
1752    }
1753
1754    async fn assert_vnode_prune_get_not_pruned_nonoverlapping() {
1755        let table_id = TableId::new(42);
1756        let epoch = test_epoch(3);
1757        let sstable_store = mock_sstable_store().await;
1758        let registry = Registry::new();
1759        let metrics = Arc::new(HummockStateStoreMetrics::new(&registry, MetricLevel::Debug));
1760        let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1761        let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1762
1763        let mut opts = default_builder_opt_for_test();
1764        opts.max_vnode_key_range_bytes = None;
1765        let mut kvs = vec![
1766            (
1767                FullKey::new_with_gap_epoch(
1768                    table_id,
1769                    gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1770                    EpochWithGap::new_from_epoch(epoch),
1771                ),
1772                HummockValue::put(Bytes::from_static(b"v1")),
1773            ),
1774            (
1775                FullKey::new_with_gap_epoch(
1776                    table_id,
1777                    gen_key_from_bytes(VirtualNode::ZERO, b"cc"),
1778                    EpochWithGap::new_from_epoch(epoch),
1779                ),
1780                HummockValue::put(Bytes::from_static(b"v0")),
1781            ),
1782        ];
1783        kvs.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1784        let (_, mut sstable_info): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1785            gen_test_sstable_with_table_ids(
1786                opts,
1787                10,
1788                kvs.into_iter(),
1789                sstable_store.clone(),
1790                vec![table_id.as_raw_id()],
1791            )
1792            .await;
1793        // Override vnode stats to ensure the queried key falls inside the recorded range.
1794        let mut inner = sstable_info.get_inner();
1795        inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1796            VirtualNode::from_index(1),
1797            (
1798                UserKey::new(
1799                    table_id,
1800                    gen_key_from_bytes(VirtualNode::from_index(1), b"aa"),
1801                ),
1802                UserKey::new(
1803                    table_id,
1804                    gen_key_from_bytes(VirtualNode::from_index(1), b"zz"),
1805                ),
1806            ),
1807        )])));
1808        sstable_info = inner.into();
1809        let committed = build_version_from_sstable(table_id, sstable_info.clone());
1810
1811        // Key is within vnode range, should not be pruned.
1812        let mut raw = VirtualNode::from_index(1).to_be_bytes().to_vec();
1813        raw.extend_from_slice(b"aa");
1814        let table_key = TableKey(Bytes::from(raw.clone()));
1815
1816        let result = reader
1817            .get(
1818                table_key,
1819                epoch,
1820                table_id,
1821                TableOption::default(),
1822                ReadOptions::default(),
1823                (vec![], vec![], committed),
1824                |_k, v| Ok(Bytes::copy_from_slice(v)),
1825            )
1826            .await
1827            .unwrap();
1828        flush_local_metrics_for_test();
1829        assert!(result.is_some(), "key should be read when not pruned");
1830        let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1831        assert_eq!(checked_before + 1, checked_after);
1832        assert_eq!(pruned_before, pruned_after);
1833    }
1834
1835    #[tokio::test]
1836    async fn test_vnode_prune_get_single_sst_cases() {
1837        assert_vnode_prune_get_skips_out_of_range_key(
1838            TableId::default(),
1839            test_epoch(1),
1840            PbLevelType::Nonoverlapping,
1841        )
1842        .await;
1843        assert_vnode_prune_get_skips_out_of_range_key(
1844            TableId::new(7),
1845            test_epoch(2),
1846            PbLevelType::Overlapping,
1847        )
1848        .await;
1849        assert_vnode_prune_get_not_pruned_nonoverlapping().await;
1850    }
1851
1852    #[tokio::test]
1853    async fn test_vnode_prune_get_overlapping_distribution_prunes_only_out_of_range_sst() {
1854        let table_id = TableId::new(77);
1855        let epoch = test_epoch(4);
1856        let vnode = VirtualNode::from_index(1);
1857        let sstable_store = mock_sstable_store().await;
1858        let registry = Registry::new();
1859        let metrics = Arc::new(HummockStateStoreMetrics::new(&registry, MetricLevel::Debug));
1860        let reader = HummockVersionReader::new(sstable_store.clone(), metrics.clone(), 0);
1861        let (checked_before, pruned_before) = vnode_prune_counts(&metrics, table_id, "get");
1862
1863        let mut opts = default_builder_opt_for_test();
1864        opts.max_vnode_key_range_bytes = None;
1865
1866        let mut kvs1 = vec![
1867            (
1868                FullKey::new_with_gap_epoch(
1869                    table_id,
1870                    gen_key_from_bytes(vnode, b"aa"),
1871                    EpochWithGap::new_from_epoch(epoch),
1872                ),
1873                HummockValue::put(Bytes::from_static(b"s1_aa")),
1874            ),
1875            (
1876                FullKey::new_with_gap_epoch(
1877                    table_id,
1878                    gen_key_from_bytes(vnode, b"zz"),
1879                    EpochWithGap::new_from_epoch(epoch),
1880                ),
1881                HummockValue::put(Bytes::from_static(b"s1_zz")),
1882            ),
1883        ];
1884        kvs1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1885        let (_, mut sst1): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1886            gen_test_sstable_with_table_ids(
1887                opts.clone(),
1888                11,
1889                kvs1.into_iter(),
1890                sstable_store.clone(),
1891                vec![table_id.as_raw_id()],
1892            )
1893            .await;
1894        let mut sst1_inner = sst1.get_inner();
1895        sst1_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1896            vnode,
1897            (
1898                UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1899                UserKey::new(table_id, gen_key_from_bytes(vnode, b"bb")),
1900            ),
1901        )])));
1902        sst1 = sst1_inner.into();
1903
1904        let mut kvs2 = vec![(
1905            FullKey::new_with_gap_epoch(
1906                table_id,
1907                gen_key_from_bytes(vnode, b"mm"),
1908                EpochWithGap::new_from_epoch(epoch),
1909            ),
1910            HummockValue::put(Bytes::from_static(b"hit")),
1911        )];
1912        kvs2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
1913        let (_, mut sst2): (crate::hummock::sstable_store::TableHolder, SstableInfo) =
1914            gen_test_sstable_with_table_ids(
1915                opts,
1916                12,
1917                kvs2.into_iter(),
1918                sstable_store.clone(),
1919                vec![table_id.as_raw_id()],
1920            )
1921            .await;
1922        let mut sst2_inner = sst2.get_inner();
1923        sst2_inner.vnode_statistics = Some(VnodeStatistics::from_map(BTreeMap::from_iter([(
1924            vnode,
1925            (
1926                UserKey::new(table_id, gen_key_from_bytes(vnode, b"aa")),
1927                UserKey::new(table_id, gen_key_from_bytes(vnode, b"zz")),
1928            ),
1929        )])));
1930        sst2 = sst2_inner.into();
1931
1932        let committed =
1933            build_version_from_sstables(table_id, vec![sst1, sst2], PbLevelType::Overlapping);
1934
1935        let table_key = gen_key_from_bytes(vnode, b"mm");
1936        let result = reader
1937            .get(
1938                table_key,
1939                epoch,
1940                table_id,
1941                TableOption::default(),
1942                ReadOptions::default(),
1943                (vec![], vec![], committed),
1944                |_k, v| Ok(Bytes::copy_from_slice(v)),
1945            )
1946            .await
1947            .unwrap();
1948        flush_local_metrics_for_test();
1949
1950        assert_eq!(result, Some(Bytes::from_static(b"hit")));
1951        let (checked_after, pruned_after) = vnode_prune_counts(&metrics, table_id, "get");
1952        // First SST is pruned by vnode stats; second SST is checked and read.
1953        assert_eq!(checked_before + 2, checked_after);
1954        assert_eq!(pruned_before + 1, pruned_after);
1955    }
1956}