risingwave_storage/hummock/store/
version.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::{TableId, TableOption};
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36    TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54    MemoryTracker, filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts,
55    range_overlap, search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63    hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72    OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79/// Data not committed to Hummock. There are two types of staging data:
80/// - Immutable memtable: data that has been written into local state store but not persisted.
81/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
82///   hummock version.
83
84#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86    // newer data comes first
87    sstable_infos: Vec<LocalSstableInfo>,
88    old_value_sstable_infos: Vec<LocalSstableInfo>,
89    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
90    /// The field must not be empty.
91    epochs: Vec<HummockEpoch>,
92    // newer data at the front
93    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94    imm_size: usize,
95}
96
97impl StagingSstableInfo {
98    pub fn new(
99        sstable_infos: Vec<LocalSstableInfo>,
100        old_value_sstable_infos: Vec<LocalSstableInfo>,
101        epochs: Vec<HummockEpoch>,
102        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103        imm_size: usize,
104    ) -> Self {
105        // the epochs are sorted from higher epoch to lower epoch
106        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107        Self {
108            sstable_infos,
109            old_value_sstable_infos,
110            epochs,
111            imm_ids,
112            imm_size,
113        }
114    }
115
116    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117        &self.sstable_infos
118    }
119
120    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121        &self.old_value_sstable_infos
122    }
123
124    pub fn imm_size(&self) -> usize {
125        self.imm_size
126    }
127
128    pub fn epochs(&self) -> &Vec<HummockEpoch> {
129        &self.epochs
130    }
131
132    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133        &self.imm_ids
134    }
135}
136
137pub enum VersionUpdate {
138    Sst(Arc<StagingSstableInfo>),
139    CommittedSnapshot(CommittedVersion),
140    NewTableWatermark {
141        direction: WatermarkDirection,
142        epoch: HummockEpoch,
143        vnode_watermarks: Vec<VnodeWatermark>,
144        watermark_type: WatermarkSerdeType,
145    },
146}
147
148pub struct StagingVersion {
149    pending_imm_size: usize,
150    /// It contains the imms added but not sent to the uploader of hummock event handler.
151    /// It is non-empty only when `upload_on_flush` is false.
152    ///
153    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
154    ///
155    /// newer data comes last
156    pub pending_imms: Vec<(ImmutableMemtable, MemoryTracker)>,
157    /// It contains the imms already sent to uploader of hummock event handler.
158    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
159    /// imm of smaller batch id may be added later than one with greater batch id
160    ///
161    /// Newer data comes first.
162    pub uploading_imms: VecDeque<ImmutableMemtable>,
163
164    // newer data comes first
165    pub sst: VecDeque<Arc<StagingSstableInfo>>,
166}
167
168impl StagingVersion {
169    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
170    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
171    pub fn prune_overlap<'a>(
172        &'a self,
173        max_epoch_inclusive: HummockEpoch,
174        table_id: TableId,
175        table_key_range: &'a TableKeyRange,
176    ) -> (
177        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
178        impl Iterator<Item = &'a SstableInfo> + 'a,
179    ) {
180        let (left, right) = table_key_range;
181        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
182        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
183        let overlapped_imms = self
184            .pending_imms
185            .iter()
186            .map(|(imm, _)| imm)
187            .rev() // rev to let newer imm come first
188            .chain(self.uploading_imms.iter())
189            .filter(move |imm| {
190                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
191                imm.min_epoch() <= max_epoch_inclusive
192                    && imm.table_id == table_id
193                    && range_overlap(
194                        &(left, right),
195                        &imm.start_table_key(),
196                        Bound::Included(&imm.end_table_key()),
197                    )
198            });
199
200        // TODO: Remove duplicate sst based on sst id
201        let overlapped_ssts = self
202            .sst
203            .iter()
204            .filter(move |staging_sst| {
205                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206                sst_max_epoch <= max_epoch_inclusive
207            })
208            .flat_map(move |staging_sst| {
209                // TODO: sstable info should be concat-able after each streaming table owns a read
210                // version. May use concat sstable iter instead in some cases.
211                staging_sst
212                    .sstable_infos
213                    .iter()
214                    .map(|sstable| &sstable.sst_info)
215                    .filter(move |sstable: &&SstableInfo| {
216                        filter_single_sst(sstable, table_id, table_key_range)
217                    })
218            });
219        (overlapped_imms, overlapped_ssts)
220    }
221
222    pub fn is_empty(&self) -> bool {
223        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224    }
225}
226
227/// A container of information required for reading from hummock.
228pub struct HummockReadVersion {
229    table_id: TableId,
230    instance_id: LocalInstanceId,
231
232    is_initialized: bool,
233
234    /// Local version for staging data.
235    staging: StagingVersion,
236
237    /// Remote version for committed data.
238    committed: CommittedVersion,
239
240    /// Indicate if this is replicated. If it is, we should ignore it during
241    /// global state store read, to avoid duplicated results.
242    /// Otherwise for local state store, it is fine, see we will see the
243    /// `ReadVersion` just for that local state store.
244    is_replicated: bool,
245
246    table_watermarks: Option<TableWatermarksIndex>,
247
248    // Vnode bitmap corresponding to the read version
249    // It will be initialized after local state store init
250    vnodes: Arc<Bitmap>,
251}
252
253impl HummockReadVersion {
254    pub fn new_with_replication_option(
255        table_id: TableId,
256        instance_id: LocalInstanceId,
257        committed_version: CommittedVersion,
258        is_replicated: bool,
259        vnodes: Arc<Bitmap>,
260    ) -> Self {
261        // before build `HummockReadVersion`, we need to get the a initial version which obtained
262        // from meta. want this initialization after version is initialized (now with
263        // notification), so add a assert condition to guarantee correct initialization order
264        assert!(committed_version.is_valid());
265        Self {
266            table_id,
267            instance_id,
268            table_watermarks: {
269                match committed_version.table_watermarks.get(&table_id) {
270                    Some(table_watermarks) => Some(TableWatermarksIndex::new_committed(
271                        table_watermarks.clone(),
272                        committed_version
273                            .state_table_info
274                            .info()
275                            .get(&table_id)
276                            .expect("should exist")
277                            .committed_epoch,
278                        table_watermarks.watermark_type,
279                    )),
280                    None => None,
281                }
282            },
283            staging: StagingVersion {
284                pending_imm_size: 0,
285                pending_imms: Vec::default(),
286                uploading_imms: VecDeque::default(),
287                sst: VecDeque::default(),
288            },
289
290            committed: committed_version,
291
292            is_replicated,
293            vnodes,
294            is_initialized: false,
295        }
296    }
297
298    pub fn new(
299        table_id: TableId,
300        instance_id: LocalInstanceId,
301        committed_version: CommittedVersion,
302        vnodes: Arc<Bitmap>,
303    ) -> Self {
304        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
305    }
306
307    pub fn table_id(&self) -> TableId {
308        self.table_id
309    }
310
311    pub fn init(&mut self) {
312        assert!(!self.is_initialized);
313        self.is_initialized = true;
314    }
315
316    pub fn add_pending_imm(&mut self, imm: ImmutableMemtable, tracker: MemoryTracker) {
317        assert!(self.is_initialized);
318        assert!(!self.is_replicated);
319        if let Some(item) = self
320            .staging
321            .pending_imms
322            .last()
323            .map(|(imm, _)| imm)
324            .or_else(|| self.staging.uploading_imms.front())
325        {
326            // check batch_id order from newest to old
327            assert!(item.batch_id() < imm.batch_id());
328        }
329
330        self.staging.pending_imm_size += imm.size();
331        self.staging.pending_imms.push((imm, tracker));
332    }
333
334    pub fn add_replicated_imm(&mut self, imm: ImmutableMemtable) {
335        assert!(self.is_initialized);
336        assert!(self.is_replicated);
337        assert!(self.staging.pending_imms.is_empty());
338        if let Some(item) = self.staging.uploading_imms.front() {
339            // check batch_id order from newest to old
340            assert!(item.batch_id() < imm.batch_id());
341        }
342        self.staging.uploading_imms.push_front(imm);
343    }
344
345    pub fn pending_imm_size(&self) -> usize {
346        self.staging.pending_imm_size
347    }
348
349    pub fn start_upload_pending_imms(&mut self) -> Vec<(ImmutableMemtable, MemoryTracker)> {
350        assert!(self.is_initialized);
351        assert!(!self.is_replicated);
352        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
353        for (imm, _) in &pending_imms {
354            self.staging.uploading_imms.push_front(imm.clone());
355        }
356        self.staging.pending_imm_size = 0;
357        pending_imms
358    }
359
360    /// Updates the read version with `VersionUpdate`.
361    /// There will be three data types to be processed
362    /// `VersionUpdate::Staging`
363    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
364    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
365    ///       corresponding `staging_imms` according to the `batch_id`
366    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
367    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
368    /// `staging_sst` and `staging_imm` in memory according to epoch
369    pub fn update(&mut self, info: VersionUpdate) {
370        match info {
371            VersionUpdate::Sst(staging_sst_ref) => {
372                {
373                    assert!(!self.is_replicated);
374                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
375                        warn!(
376                            instance_id = self.instance_id,
377                            "no related imm in sst input"
378                        );
379                        return;
380                    };
381
382                    // old data comes first
383                    for imm_id in imms.iter().rev() {
384                        let check_err = match self.staging.uploading_imms.pop_back() {
385                            None => Some("empty".to_owned()),
386                            Some(prev_imm_id) => {
387                                if prev_imm_id.batch_id() == *imm_id {
388                                    None
389                                } else {
390                                    Some(format!(
391                                        "miss match id {} {}",
392                                        prev_imm_id.batch_id(),
393                                        *imm_id
394                                    ))
395                                }
396                            }
397                        };
398                        assert!(
399                            check_err.is_none(),
400                            "should be valid staging_sst.size {},
401                                    staging_sst.imm_ids {:?},
402                                    staging_sst.epochs {:?},
403                                    local_pending_imm_ids {:?},
404                                    local_uploading_imm_ids {:?},
405                                    instance_id {}
406                                    check_err {:?}",
407                            staging_sst_ref.imm_size,
408                            staging_sst_ref.imm_ids,
409                            staging_sst_ref.epochs,
410                            self.staging
411                                .pending_imms
412                                .iter()
413                                .map(|(imm, _)| imm.batch_id())
414                                .collect_vec(),
415                            self.staging
416                                .uploading_imms
417                                .iter()
418                                .map(|imm| imm.batch_id())
419                                .collect_vec(),
420                            self.instance_id,
421                            check_err
422                        );
423                    }
424
425                    self.staging.sst.push_front(staging_sst_ref);
426                }
427            }
428
429            VersionUpdate::CommittedSnapshot(committed_version) => {
430                if let Some(info) = committed_version
431                    .state_table_info
432                    .info()
433                    .get(&self.table_id)
434                {
435                    let committed_epoch = info.committed_epoch;
436                    if self.is_replicated {
437                        self.staging
438                            .uploading_imms
439                            .retain(|imm| imm.min_epoch() > committed_epoch);
440                        self.staging
441                            .pending_imms
442                            .retain(|(imm, _)| imm.min_epoch() > committed_epoch);
443                    } else {
444                        self.staging
445                            .pending_imms
446                            .iter()
447                            .map(|(imm, _)| imm)
448                            .chain(self.staging.uploading_imms.iter())
449                            .for_each(|imm| {
450                                assert!(
451                                    imm.min_epoch() > committed_epoch,
452                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
453                                    imm.table_id,
454                                    imm.min_epoch(),
455                                    committed_epoch
456                                )
457                            });
458                    }
459
460                    self.staging.sst.retain(|sst| {
461                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
462                    });
463
464                    // check epochs.last() > MCE
465                    assert!(self.staging.sst.iter().all(|sst| {
466                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
467                    }));
468
469                    if let Some(committed_watermarks) =
470                        committed_version.table_watermarks.get(&self.table_id)
471                    {
472                        if let Some(watermark_index) = &mut self.table_watermarks {
473                            watermark_index.apply_committed_watermarks(
474                                committed_watermarks.clone(),
475                                committed_epoch,
476                            );
477                        } else {
478                            self.table_watermarks = Some(TableWatermarksIndex::new_committed(
479                                committed_watermarks.clone(),
480                                committed_epoch,
481                                committed_watermarks.watermark_type,
482                            ));
483                        }
484                    }
485                }
486
487                self.committed = committed_version;
488            }
489            VersionUpdate::NewTableWatermark {
490                direction,
491                epoch,
492                vnode_watermarks,
493                watermark_type,
494            } => {
495                if let Some(watermark_index) = &mut self.table_watermarks {
496                    watermark_index.add_epoch_watermark(
497                        epoch,
498                        Arc::from(vnode_watermarks),
499                        direction,
500                    );
501                } else {
502                    self.table_watermarks = Some(TableWatermarksIndex::new(
503                        direction,
504                        epoch,
505                        vnode_watermarks,
506                        self.committed.table_committed_epoch(self.table_id),
507                        watermark_type,
508                    ));
509                }
510            }
511        }
512    }
513
514    pub fn staging(&self) -> &StagingVersion {
515        &self.staging
516    }
517
518    pub fn committed(&self) -> &CommittedVersion {
519        &self.committed
520    }
521
522    /// We have assumption that the watermark is increasing monotonically. Therefore,
523    /// here if the upper layer usage has passed an regressed watermark, we should
524    /// filter out the regressed watermark. Currently the kv log store may write
525    /// regressed watermark
526    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
527        if let Some(watermark_index) = &self.table_watermarks {
528            watermark_index.filter_regress_watermarks(watermarks)
529        }
530    }
531
532    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
533        self.table_watermarks
534            .as_ref()
535            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
536    }
537
538    pub fn is_initialized(&self) -> bool {
539        self.is_initialized
540    }
541
542    pub fn is_replicated(&self) -> bool {
543        self.is_replicated
544    }
545
546    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
547        std::mem::replace(&mut self.vnodes, vnodes)
548    }
549
550    pub fn contains(&self, vnode: VirtualNode) -> bool {
551        self.vnodes.is_set(vnode.to_index())
552    }
553
554    pub fn vnodes(&self) -> Arc<Bitmap> {
555        self.vnodes.clone()
556    }
557}
558
559pub fn read_filter_for_version(
560    epoch: HummockEpoch,
561    table_id: TableId,
562    mut table_key_range: TableKeyRange,
563    read_version: &RwLock<HummockReadVersion>,
564) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
565    let read_version_guard = read_version.read();
566
567    let committed_version = read_version_guard.committed().clone();
568
569    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
570        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
571    }
572
573    let (imm_iter, sst_iter) =
574        read_version_guard
575            .staging()
576            .prune_overlap(epoch, table_id, &table_key_range);
577
578    let imms = imm_iter.cloned().collect();
579    let ssts = sst_iter.cloned().collect();
580
581    Ok((table_key_range, (imms, ssts, committed_version)))
582}
583
584#[derive(Clone)]
585pub struct HummockVersionReader {
586    sstable_store: SstableStoreRef,
587
588    /// Statistics
589    state_store_metrics: Arc<HummockStateStoreMetrics>,
590    preload_retry_times: usize,
591}
592
593/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
594/// `streaming_query`
595impl HummockVersionReader {
596    pub fn new(
597        sstable_store: SstableStoreRef,
598        state_store_metrics: Arc<HummockStateStoreMetrics>,
599        preload_retry_times: usize,
600    ) -> Self {
601        Self {
602            sstable_store,
603            state_store_metrics,
604            preload_retry_times,
605        }
606    }
607
608    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
609        &self.state_store_metrics
610    }
611}
612
613const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
614
615impl HummockVersionReader {
616    pub async fn get<'a, O>(
617        &'a self,
618        table_key: TableKey<Bytes>,
619        epoch: u64,
620        table_id: TableId,
621        table_option: TableOption,
622        read_options: ReadOptions,
623        read_version_tuple: ReadVersionTuple,
624        on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
625    ) -> StorageResult<Option<O>> {
626        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
627
628        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
629        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
630        let local_stats = &mut stats_guard.local_stats;
631        local_stats.found_key = true;
632
633        // 1. read staging data
634        for imm in &imms {
635            // skip imm that only holding out-of-date data
636            if imm.max_epoch() < min_epoch {
637                continue;
638            }
639
640            local_stats.staging_imm_get_count += 1;
641
642            if let Some((data, data_epoch)) = get_from_batch(
643                imm,
644                TableKey(table_key.as_ref()),
645                epoch,
646                &read_options,
647                local_stats,
648            ) {
649                return Ok(if data_epoch.pure_epoch() < min_epoch {
650                    None
651                } else {
652                    data.into_user_value()
653                        .map(|v| {
654                            on_key_value_fn(
655                                FullKey::new_with_gap_epoch(
656                                    table_id,
657                                    table_key.to_ref(),
658                                    data_epoch,
659                                ),
660                                v.as_ref(),
661                            )
662                        })
663                        .transpose()?
664                });
665            }
666        }
667
668        // 2. order guarantee: imm -> sst
669        let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
670            Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
671        });
672
673        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
674        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
675        let full_key = FullKey::new_with_gap_epoch(
676            table_id,
677            TableKey(table_key.clone()),
678            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
679        );
680        let single_table_key_range = table_key.clone()..=table_key.clone();
681
682        // prune uncommitted ssts with the keyrange
683        let pruned_uncommitted_ssts =
684            prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
685        for local_sst in pruned_uncommitted_ssts {
686            local_stats.staging_sst_get_count += 1;
687            if let Some(iter) = get_from_sstable_info(
688                self.sstable_store.clone(),
689                local_sst,
690                full_key.to_ref(),
691                &read_options,
692                dist_key_hash,
693                local_stats,
694            )
695            .await?
696            {
697                debug_assert!(iter.is_valid());
698                let data_epoch = iter.key().epoch_with_gap;
699                return Ok(if data_epoch.pure_epoch() < min_epoch {
700                    None
701                } else {
702                    iter.value()
703                        .into_user_value()
704                        .map(|v| {
705                            on_key_value_fn(
706                                FullKey::new_with_gap_epoch(
707                                    table_id,
708                                    table_key.to_ref(),
709                                    data_epoch,
710                                ),
711                                v,
712                            )
713                        })
714                        .transpose()?
715                });
716            }
717        }
718        // 3. read from committed_version sst file
719        // Because SST meta records encoded key range,
720        // the filter key needs to be encoded as well.
721        assert!(committed_version.is_valid());
722        for level in committed_version.levels(table_id) {
723            if level.table_infos.is_empty() {
724                continue;
725            }
726
727            match level.level_type {
728                LevelType::Overlapping | LevelType::Unspecified => {
729                    let sstable_infos = prune_overlapping_ssts(
730                        &level.table_infos,
731                        table_id,
732                        &single_table_key_range,
733                    );
734                    for sstable_info in sstable_infos {
735                        local_stats.overlapping_get_count += 1;
736                        if let Some(iter) = get_from_sstable_info(
737                            self.sstable_store.clone(),
738                            sstable_info,
739                            full_key.to_ref(),
740                            &read_options,
741                            dist_key_hash,
742                            local_stats,
743                        )
744                        .await?
745                        {
746                            debug_assert!(iter.is_valid());
747                            let data_epoch = iter.key().epoch_with_gap;
748                            return Ok(if data_epoch.pure_epoch() < min_epoch {
749                                None
750                            } else {
751                                iter.value()
752                                    .into_user_value()
753                                    .map(|v| {
754                                        on_key_value_fn(
755                                            FullKey::new_with_gap_epoch(
756                                                table_id,
757                                                table_key.to_ref(),
758                                                data_epoch,
759                                            ),
760                                            v,
761                                        )
762                                    })
763                                    .transpose()?
764                            });
765                        }
766                    }
767                }
768                LevelType::Nonoverlapping => {
769                    let mut table_info_idx =
770                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
771                    if table_info_idx == 0 {
772                        continue;
773                    }
774                    table_info_idx = table_info_idx.saturating_sub(1);
775
776                    if level.table_infos[table_info_idx]
777                        .table_ids
778                        .binary_search(&table_id)
779                        .is_err()
780                    {
781                        continue;
782                    }
783
784                    let ord = level.table_infos[table_info_idx]
785                        .key_range
786                        .compare_right_with_user_key(full_key.user_key.as_ref());
787                    // the case that the key falls into the gap between two ssts
788                    if ord == Ordering::Less {
789                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
790                        continue;
791                    }
792
793                    local_stats.non_overlapping_get_count += 1;
794                    if let Some(iter) = get_from_sstable_info(
795                        self.sstable_store.clone(),
796                        &level.table_infos[table_info_idx],
797                        full_key.to_ref(),
798                        &read_options,
799                        dist_key_hash,
800                        local_stats,
801                    )
802                    .await?
803                    {
804                        debug_assert!(iter.is_valid());
805                        let data_epoch = iter.key().epoch_with_gap;
806                        return Ok(if data_epoch.pure_epoch() < min_epoch {
807                            None
808                        } else {
809                            iter.value()
810                                .into_user_value()
811                                .map(|v| {
812                                    on_key_value_fn(
813                                        FullKey::new_with_gap_epoch(
814                                            table_id,
815                                            table_key.to_ref(),
816                                            data_epoch,
817                                        ),
818                                        v,
819                                    )
820                                })
821                                .transpose()?
822                        });
823                    }
824                }
825            }
826        }
827        stats_guard.local_stats.found_key = false;
828        Ok(None)
829    }
830
831    pub async fn iter(
832        &self,
833        table_key_range: TableKeyRange,
834        epoch: u64,
835        table_id: TableId,
836        table_option: TableOption,
837        read_options: ReadOptions,
838        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
839    ) -> StorageResult<HummockStorageIterator> {
840        self.iter_with_memtable(
841            table_key_range,
842            epoch,
843            table_id,
844            table_option,
845            read_options,
846            read_version_tuple,
847            None,
848        )
849        .await
850    }
851
852    pub async fn iter_with_memtable<'b>(
853        &self,
854        table_key_range: TableKeyRange,
855        epoch: u64,
856        table_id: TableId,
857        table_option: TableOption,
858        read_options: ReadOptions,
859        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
860        memtable_iter: Option<MemTableHummockIterator<'b>>,
861    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
862        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
863        let user_key_range = (
864            user_key_range_ref.0.map(|key| key.cloned()),
865            user_key_range_ref.1.map(|key| key.cloned()),
866        );
867        let mut factory = ForwardIteratorFactory::default();
868        let mut local_stats = StoreLocalStatistic::default();
869        let (imms, uncommitted_ssts, committed) = read_version_tuple;
870        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
871        self.iter_inner(
872            table_key_range,
873            epoch,
874            table_id,
875            read_options,
876            imms,
877            uncommitted_ssts,
878            &committed,
879            &mut local_stats,
880            &mut factory,
881        )
882        .await?;
883        let merge_iter = factory.build(memtable_iter);
884        // the epoch_range left bound for iterator read
885        let mut user_iter = UserIterator::new(
886            merge_iter,
887            user_key_range,
888            epoch,
889            min_epoch,
890            Some(committed),
891        );
892        user_iter.rewind().await?;
893        Ok(HummockStorageIteratorInner::new(
894            user_iter,
895            self.state_store_metrics.clone(),
896            table_id,
897            local_stats,
898        ))
899    }
900
901    pub async fn rev_iter<'b>(
902        &self,
903        table_key_range: TableKeyRange,
904        epoch: u64,
905        table_id: TableId,
906        table_option: TableOption,
907        read_options: ReadOptions,
908        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
909        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
910    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
911        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
912        let user_key_range = (
913            user_key_range_ref.0.map(|key| key.cloned()),
914            user_key_range_ref.1.map(|key| key.cloned()),
915        );
916        let mut factory = BackwardIteratorFactory::default();
917        let mut local_stats = StoreLocalStatistic::default();
918        let (imms, uncommitted_ssts, committed) = read_version_tuple;
919        let min_epoch = gen_min_epoch(epoch, table_option.retention_seconds);
920        self.iter_inner(
921            table_key_range,
922            epoch,
923            table_id,
924            read_options,
925            imms,
926            uncommitted_ssts,
927            &committed,
928            &mut local_stats,
929            &mut factory,
930        )
931        .await?;
932        let merge_iter = factory.build(memtable_iter);
933        // the epoch_range left bound for iterator read
934        let mut user_iter = BackwardUserIterator::new(
935            merge_iter,
936            user_key_range,
937            epoch,
938            min_epoch,
939            Some(committed),
940        );
941        user_iter.rewind().await?;
942        Ok(HummockStorageRevIteratorInner::new(
943            user_iter,
944            self.state_store_metrics.clone(),
945            table_id,
946            local_stats,
947        ))
948    }
949
950    async fn iter_inner<F: IteratorFactory>(
951        &self,
952        table_key_range: TableKeyRange,
953        epoch: u64,
954        table_id: TableId,
955        read_options: ReadOptions,
956        imms: Vec<ImmutableMemtable>,
957        uncommitted_ssts: Vec<SstableInfo>,
958        committed: &CommittedVersion,
959        local_stats: &mut StoreLocalStatistic,
960        factory: &mut F,
961    ) -> StorageResult<()> {
962        {
963            fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
964                match bound {
965                    Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
966                    Bound::Unbounded => None,
967                }
968            }
969            let (left, right) = &table_key_range;
970            if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
971                && right < left
972            {
973                if cfg!(debug_assertions) {
974                    panic!("invalid iter key range: {table_id} {left:?} {right:?}")
975                } else {
976                    return Err(HummockError::other(format!(
977                        "invalid iter key range: {table_id} {left:?} {right:?}"
978                    ))
979                    .into());
980                }
981            }
982        }
983
984        local_stats.staging_imm_iter_count = imms.len() as u64;
985        for imm in imms {
986            factory.add_batch_iter(imm);
987        }
988
989        // 2. build iterator from committed
990        // Because SST meta records encoded key range,
991        // the filter key range needs to be encoded as well.
992        let user_key_range = bound_table_key_range(table_id, &table_key_range);
993        let user_key_range_ref = (
994            user_key_range.0.as_ref().map(UserKey::as_ref),
995            user_key_range.1.as_ref().map(UserKey::as_ref),
996        );
997        let mut staging_sst_iter_count = 0;
998        // encode once
999        let bloom_filter_prefix_hash = read_options
1000            .prefix_hint
1001            .as_ref()
1002            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
1003        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
1004        if read_options.prefetch_options.prefetch {
1005            sst_read_options.must_iterated_end_user_key =
1006                Some(user_key_range.1.map(|key| key.cloned()));
1007            sst_read_options.max_preload_retry_times = self.preload_retry_times;
1008        }
1009        let sst_read_options = Arc::new(sst_read_options);
1010        for sstable_info in &uncommitted_ssts {
1011            let table_holder = self
1012                .sstable_store
1013                .sstable(sstable_info, local_stats)
1014                .await?;
1015
1016            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
1017                && !hit_sstable_bloom_filter(
1018                    &table_holder,
1019                    &user_key_range_ref,
1020                    *prefix_hash,
1021                    local_stats,
1022                )
1023            {
1024                continue;
1025            }
1026
1027            staging_sst_iter_count += 1;
1028            factory.add_staging_sst_iter(F::SstableIteratorType::create(
1029                table_holder,
1030                self.sstable_store.clone(),
1031                sst_read_options.clone(),
1032                sstable_info,
1033            ));
1034        }
1035        local_stats.staging_sst_iter_count = staging_sst_iter_count;
1036
1037        let timer = Instant::now();
1038
1039        for level in committed.levels(table_id) {
1040            if level.table_infos.is_empty() {
1041                continue;
1042            }
1043
1044            if level.level_type == LevelType::Nonoverlapping {
1045                let mut table_infos =
1046                    prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1047                        .peekable();
1048
1049                if table_infos.peek().is_none() {
1050                    continue;
1051                }
1052                let sstable_infos = table_infos.cloned().collect_vec();
1053                if sstable_infos.len() > 1 {
1054                    factory.add_concat_sst_iter(
1055                        sstable_infos,
1056                        self.sstable_store.clone(),
1057                        sst_read_options.clone(),
1058                    );
1059                    local_stats.non_overlapping_iter_count += 1;
1060                } else {
1061                    let sstable = self
1062                        .sstable_store
1063                        .sstable(&sstable_infos[0], local_stats)
1064                        .await?;
1065
1066                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1067                        && !hit_sstable_bloom_filter(
1068                            &sstable,
1069                            &user_key_range_ref,
1070                            *dist_hash,
1071                            local_stats,
1072                        )
1073                    {
1074                        continue;
1075                    }
1076                    // Since there is only one sst to be included for the current non-overlapping
1077                    // level, there is no need to create a ConcatIterator on it.
1078                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1079                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1080                    // it in `non_overlapping_iter_count`.
1081                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1082                        sstable,
1083                        self.sstable_store.clone(),
1084                        sst_read_options.clone(),
1085                        &sstable_infos[0],
1086                    ));
1087                    local_stats.non_overlapping_iter_count += 1;
1088                }
1089            } else {
1090                let table_infos =
1091                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1092                // Overlapping
1093                let fetch_meta_req = table_infos.rev().collect_vec();
1094                if fetch_meta_req.is_empty() {
1095                    continue;
1096                }
1097                for sstable_info in fetch_meta_req {
1098                    let sstable = self
1099                        .sstable_store
1100                        .sstable(sstable_info, local_stats)
1101                        .await?;
1102                    assert_eq!(sstable_info.object_id, sstable.id);
1103                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1104                        && !hit_sstable_bloom_filter(
1105                            &sstable,
1106                            &user_key_range_ref,
1107                            *dist_hash,
1108                            local_stats,
1109                        )
1110                    {
1111                        continue;
1112                    }
1113                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1114                        sstable,
1115                        self.sstable_store.clone(),
1116                        sst_read_options.clone(),
1117                        sstable_info,
1118                    ));
1119                    local_stats.overlapping_iter_count += 1;
1120                }
1121            }
1122        }
1123        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1124        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1125            let table_id_string = table_id.to_string();
1126            tracing::warn!(
1127                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1128                table_id_string,
1129                epoch,
1130                fetch_meta_duration_sec,
1131                local_stats.cache_meta_block_miss
1132            );
1133            self.state_store_metrics
1134                .iter_slow_fetch_meta_cache_unhits
1135                .set(local_stats.cache_meta_block_miss as i64);
1136        }
1137        Ok(())
1138    }
1139
1140    pub async fn iter_log(
1141        &self,
1142        version: PinnedVersion,
1143        epoch_range: (u64, u64),
1144        key_range: TableKeyRange,
1145        options: ReadLogOptions,
1146    ) -> HummockResult<ChangeLogIterator> {
1147        let change_log = {
1148            let table_change_logs = version.table_change_log_read_lock();
1149            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1150                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1151            } else {
1152                Vec::new()
1153            }
1154        };
1155
1156        if let Some(max_epoch_change_log) = change_log.last() {
1157            let (_, max_epoch) = epoch_range;
1158            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1159                warn!(
1160                    max_epoch,
1161                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1162                    table_id = %options.table_id,
1163                    "max_epoch does not exist"
1164                );
1165            }
1166        }
1167        let read_options = Arc::new(SstableIteratorReadOptions {
1168            cache_policy: Default::default(),
1169            must_iterated_end_user_key: None,
1170            max_preload_retry_times: 0,
1171            prefetch_for_large_query: false,
1172        });
1173
1174        async fn make_iter(
1175            sstable_infos: impl Iterator<Item = &SstableInfo>,
1176            sstable_store: &SstableStoreRef,
1177            read_options: Arc<SstableIteratorReadOptions>,
1178            local_stat: &mut StoreLocalStatistic,
1179        ) -> HummockResult<MergeIterator<SstableIterator>> {
1180            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1181                let sstable_store = sstable_store.clone();
1182                let read_options = read_options.clone();
1183                async move {
1184                    let mut local_stat = StoreLocalStatistic::default();
1185                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1186                    Ok::<_, HummockError>((
1187                        SstableIterator::new(
1188                            table_holder,
1189                            sstable_store,
1190                            read_options,
1191                            sstable_info,
1192                        ),
1193                        local_stat,
1194                    ))
1195                }
1196            }))
1197            .await?;
1198            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1199                |(iter, stats)| {
1200                    local_stat.add(&stats);
1201                    iter
1202                },
1203            )))
1204        }
1205
1206        let mut local_stat = StoreLocalStatistic::default();
1207
1208        let new_value_iter = make_iter(
1209            change_log
1210                .iter()
1211                .flat_map(|log| log.new_value.iter())
1212                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1213            &self.sstable_store,
1214            read_options.clone(),
1215            &mut local_stat,
1216        )
1217        .await?;
1218        let old_value_iter = make_iter(
1219            change_log
1220                .iter()
1221                .flat_map(|log| log.old_value.iter())
1222                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1223            &self.sstable_store,
1224            read_options.clone(),
1225            &mut local_stat,
1226        )
1227        .await?;
1228        ChangeLogIterator::new(
1229            epoch_range,
1230            key_range,
1231            new_value_iter,
1232            old_value_iter,
1233            options.table_id,
1234            IterLocalMetricsGuard::new(
1235                self.state_store_metrics.clone(),
1236                options.table_id,
1237                local_stat,
1238            ),
1239        )
1240        .await
1241    }
1242
1243    pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1244        &'a self,
1245        version: PinnedVersion,
1246        table_id: TableId,
1247        target: VectorRef<'a>,
1248        options: VectorNearestOptions,
1249        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1250    ) -> HummockResult<Vec<O>> {
1251        let Some(index) = version.vector_indexes.get(&table_id) else {
1252            return Ok(vec![]);
1253        };
1254        if target.dimension() != index.dimension {
1255            return Err(HummockError::other(format!(
1256                "target dimension {} not match index dimension {}",
1257                target.dimension(),
1258                index.dimension
1259            )));
1260        }
1261        match &index.inner {
1262            VectorIndexImpl::Flat(flat) => {
1263                let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1264                let mut cache_stat = VectorStoreCacheStats::default();
1265                for vector_file in &flat.vector_store_info.vector_files {
1266                    let meta = self
1267                        .sstable_store
1268                        .get_vector_file_meta(vector_file, &mut cache_stat)
1269                        .await?;
1270                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1271                        let block = self
1272                            .sstable_store
1273                            .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1274                            .await?;
1275                        builder.add(&**block, &on_nearest_item_fn);
1276                    }
1277                }
1278                cache_stat.report(table_id, "flat", self.stats());
1279                Ok(builder.finish())
1280            }
1281            VectorIndexImpl::HnswFlat(hnsw_flat) => {
1282                let Some(graph_file) = &hnsw_flat.graph_file else {
1283                    return Ok(vec![]);
1284                };
1285
1286                let mut ctx = FileVectorStoreCtx::default();
1287
1288                let graph = self
1289                    .sstable_store
1290                    .get_hnsw_graph(graph_file, &mut ctx.stats)
1291                    .await?;
1292
1293                let vector_store =
1294                    FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1295                let (items, stats) = nearest::<O, M, _>(
1296                    &vector_store,
1297                    &mut ctx,
1298                    &*graph,
1299                    target,
1300                    on_nearest_item_fn,
1301                    options.hnsw_ef_search,
1302                    options.top_n,
1303                )
1304                .await?;
1305                ctx.stats.report(table_id, "hnsw_read", self.stats());
1306                report_hnsw_stat(
1307                    self.stats(),
1308                    table_id,
1309                    "hnsw_read",
1310                    options.top_n,
1311                    options.hnsw_ef_search,
1312                    [stats],
1313                );
1314                Ok(items)
1315            }
1316        }
1317    }
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322    use std::collections::{HashMap, HashSet};
1323    use std::sync::Arc;
1324
1325    use bytes::Bytes;
1326    use risingwave_common::catalog::{TableId, TableOption};
1327    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
1328    use risingwave_hummock_sdk::key::{FullKey, TableKey};
1329    use risingwave_hummock_sdk::key_range::KeyRange;
1330    use risingwave_hummock_sdk::level::{Level, Levels, OverlappingLevel};
1331    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
1332    use risingwave_hummock_sdk::version::HummockVersion;
1333    use risingwave_pb::hummock::{PbHummockVersion, PbLevelType, StateTableInfoDelta};
1334    use tokio::sync::mpsc::unbounded_channel;
1335
1336    use crate::hummock::iterator::test_utils::mock_sstable_store;
1337    use crate::hummock::local_version::pinned_version::PinnedVersion;
1338    use crate::hummock::store::version::HummockVersionReader;
1339    use crate::monitor::HummockStateStoreMetrics;
1340    use crate::store::ReadOptions;
1341
1342    /// In a nonoverlapping level, `search_sst_idx` may locate an SST whose key range covers
1343    /// the query user key but whose `table_ids` does not contain the queried table. The get
1344    /// path must skip such SSTs instead of reading them.
1345    #[tokio::test]
1346    async fn test_get_skips_sst_by_table_id_filter() {
1347        let query_table_id = TableId::new(100);
1348        let epoch: u64 = (31 * 1000) << 16;
1349        let compaction_group_id = StaticCompactionGroupId::StateDefault;
1350
1351        // SST key range: table_id 50..150, but table_ids = [50, 150] (no 100).
1352        let sst_info = SstableInfoInner {
1353            sst_id: 1.into(),
1354            object_id: 1.into(),
1355            key_range: KeyRange {
1356                left: Bytes::from(
1357                    FullKey::for_test(TableId::new(50), b"aaa".to_vec(), epoch).encode(),
1358                ),
1359                right: Bytes::from(
1360                    FullKey::for_test(TableId::new(150), b"zzz".to_vec(), epoch).encode(),
1361                ),
1362                right_exclusive: false,
1363            },
1364            table_ids: vec![TableId::new(50), TableId::new(150)],
1365            file_size: 1024,
1366            ..Default::default()
1367        }
1368        .into();
1369
1370        let level = Level {
1371            level_idx: 1,
1372            level_type: PbLevelType::Nonoverlapping,
1373            table_infos: vec![sst_info],
1374            total_file_size: 0,
1375            sub_level_id: 0,
1376            uncompressed_file_size: 0,
1377            vnode_partition_count: 0,
1378        };
1379
1380        #[allow(deprecated)]
1381        let levels = Levels {
1382            levels: vec![level],
1383            l0: OverlappingLevel::default(),
1384            group_id: compaction_group_id,
1385            parent_group_id: compaction_group_id,
1386            member_table_ids: vec![],
1387            compaction_group_version_id: 0,
1388        };
1389
1390        let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
1391            id: 1u64.into(),
1392            ..Default::default()
1393        });
1394        version.levels.insert(compaction_group_id, levels);
1395        version.state_table_info.apply_delta(
1396            &HashMap::from([(
1397                query_table_id,
1398                StateTableInfoDelta {
1399                    committed_epoch: epoch,
1400                    compaction_group_id,
1401                },
1402            )]),
1403            &HashSet::new(),
1404        );
1405
1406        let pinned_version = PinnedVersion::new(version, unbounded_channel().0);
1407        let reader = HummockVersionReader::new(
1408            mock_sstable_store().await,
1409            Arc::new(HummockStateStoreMetrics::unused()),
1410            0,
1411        );
1412
1413        let result = reader
1414            .get(
1415                TableKey(Bytes::from("test_key")),
1416                epoch,
1417                query_table_id,
1418                TableOption::default(),
1419                ReadOptions::default(),
1420                (vec![], vec![], pinned_version),
1421                |_key, _value| Ok(()),
1422            )
1423            .await
1424            .unwrap();
1425
1426        assert!(result.is_none());
1427    }
1428}