risingwave_storage/hummock/store/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::Included;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36    PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54    filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55    search_sst_idx,
56};
57use crate::hummock::{
58    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
59    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
60    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
61    hit_sstable_bloom_filter,
62};
63use crate::mem_table::{
64    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
65};
66use crate::monitor::{
67    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
68};
69use crate::store::{
70    OnNearestItemFn, ReadLogOptions, ReadOptions, Vector, VectorNearestOptions, gen_min_epoch,
71};
72use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
73
74pub type CommittedVersion = PinnedVersion;
75
76/// Data not committed to Hummock. There are two types of staging data:
77/// - Immutable memtable: data that has been written into local state store but not persisted.
78/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
79///   hummock version.
80
81#[derive(Clone, Debug, PartialEq)]
82pub struct StagingSstableInfo {
83    // newer data comes first
84    sstable_infos: Vec<LocalSstableInfo>,
85    old_value_sstable_infos: Vec<LocalSstableInfo>,
86    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
87    /// The field must not be empty.
88    epochs: Vec<HummockEpoch>,
89    // newer data at the front
90    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
91    imm_size: usize,
92}
93
94impl StagingSstableInfo {
95    pub fn new(
96        sstable_infos: Vec<LocalSstableInfo>,
97        old_value_sstable_infos: Vec<LocalSstableInfo>,
98        epochs: Vec<HummockEpoch>,
99        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
100        imm_size: usize,
101    ) -> Self {
102        // the epochs are sorted from higher epoch to lower epoch
103        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
104        Self {
105            sstable_infos,
106            old_value_sstable_infos,
107            epochs,
108            imm_ids,
109            imm_size,
110        }
111    }
112
113    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
114        &self.sstable_infos
115    }
116
117    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
118        &self.old_value_sstable_infos
119    }
120
121    pub fn imm_size(&self) -> usize {
122        self.imm_size
123    }
124
125    pub fn epochs(&self) -> &Vec<HummockEpoch> {
126        &self.epochs
127    }
128
129    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
130        &self.imm_ids
131    }
132}
133
134pub enum VersionUpdate {
135    Sst(Arc<StagingSstableInfo>),
136    CommittedSnapshot(CommittedVersion),
137    NewTableWatermark {
138        direction: WatermarkDirection,
139        epoch: HummockEpoch,
140        vnode_watermarks: Vec<VnodeWatermark>,
141        watermark_type: WatermarkSerdeType,
142    },
143}
144
145#[derive(Clone)]
146pub struct StagingVersion {
147    pending_imm_size: usize,
148    /// It contains the imms added but not sent to the uploader of hummock event handler.
149    /// It is non-empty only when `upload_on_flush` is false.
150    ///
151    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
152    ///
153    /// newer data comes last
154    pub pending_imms: Vec<ImmutableMemtable>,
155    /// It contains the imms already sent to uploader of hummock event handler.
156    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
157    /// imm of smaller batch id may be added later than one with greater batch id
158    ///
159    /// Newer data comes first.
160    pub uploading_imms: VecDeque<ImmutableMemtable>,
161
162    // newer data comes first
163    pub sst: VecDeque<Arc<StagingSstableInfo>>,
164}
165
166impl StagingVersion {
167    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
168    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
169    pub fn prune_overlap<'a>(
170        &'a self,
171        max_epoch_inclusive: HummockEpoch,
172        table_id: TableId,
173        table_key_range: &'a TableKeyRange,
174    ) -> (
175        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
176        impl Iterator<Item = &'a SstableInfo> + 'a,
177    ) {
178        let (left, right) = table_key_range;
179        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
180        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
181        let overlapped_imms = self
182            .pending_imms
183            .iter()
184            .rev() // rev to let newer imm come first
185            .chain(self.uploading_imms.iter())
186            .filter(move |imm| {
187                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
188                imm.min_epoch() <= max_epoch_inclusive
189                    && imm.table_id == table_id
190                    && range_overlap(
191                        &(left, right),
192                        &imm.start_table_key(),
193                        Included(&imm.end_table_key()),
194                    )
195            });
196
197        // TODO: Remove duplicate sst based on sst id
198        let overlapped_ssts = self
199            .sst
200            .iter()
201            .filter(move |staging_sst| {
202                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
203                sst_max_epoch <= max_epoch_inclusive
204            })
205            .flat_map(move |staging_sst| {
206                // TODO: sstable info should be concat-able after each streaming table owns a read
207                // version. May use concat sstable iter instead in some cases.
208                staging_sst
209                    .sstable_infos
210                    .iter()
211                    .map(|sstable| &sstable.sst_info)
212                    .filter(move |sstable: &&SstableInfo| {
213                        filter_single_sst(sstable, table_id, table_key_range)
214                    })
215            });
216        (overlapped_imms, overlapped_ssts)
217    }
218
219    pub fn is_empty(&self) -> bool {
220        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
221    }
222}
223
224#[derive(Clone)]
225/// A container of information required for reading from hummock.
226pub struct HummockReadVersion {
227    table_id: TableId,
228    instance_id: LocalInstanceId,
229
230    /// Local version for staging data.
231    staging: StagingVersion,
232
233    /// Remote version for committed data.
234    committed: CommittedVersion,
235
236    /// Indicate if this is replicated. If it is, we should ignore it during
237    /// global state store read, to avoid duplicated results.
238    /// Otherwise for local state store, it is fine, see we will see the
239    /// `ReadVersion` just for that local state store.
240    is_replicated: bool,
241
242    table_watermarks: Option<PkPrefixTableWatermarksIndex>,
243
244    // Vnode bitmap corresponding to the read version
245    // It will be initialized after local state store init
246    vnodes: Arc<Bitmap>,
247}
248
249impl HummockReadVersion {
250    pub fn new_with_replication_option(
251        table_id: TableId,
252        instance_id: LocalInstanceId,
253        committed_version: CommittedVersion,
254        is_replicated: bool,
255        vnodes: Arc<Bitmap>,
256    ) -> Self {
257        // before build `HummockReadVersion`, we need to get the a initial version which obtained
258        // from meta. want this initialization after version is initialized (now with
259        // notification), so add a assert condition to guarantee correct initialization order
260        assert!(committed_version.is_valid());
261        Self {
262            table_id,
263            instance_id,
264            table_watermarks: {
265                match committed_version.table_watermarks.get(&table_id) {
266                    Some(table_watermarks) => match table_watermarks.watermark_type {
267                        WatermarkSerdeType::PkPrefix => {
268                            Some(PkPrefixTableWatermarksIndex::new_committed(
269                                table_watermarks.clone(),
270                                committed_version
271                                    .state_table_info
272                                    .info()
273                                    .get(&table_id)
274                                    .expect("should exist")
275                                    .committed_epoch,
276                            ))
277                        }
278
279                        WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */
280                    },
281                    None => None,
282                }
283            },
284            staging: StagingVersion {
285                pending_imm_size: 0,
286                pending_imms: Vec::default(),
287                uploading_imms: VecDeque::default(),
288                sst: VecDeque::default(),
289            },
290
291            committed: committed_version,
292
293            is_replicated,
294            vnodes,
295        }
296    }
297
298    pub fn new(
299        table_id: TableId,
300        instance_id: LocalInstanceId,
301        committed_version: CommittedVersion,
302        vnodes: Arc<Bitmap>,
303    ) -> Self {
304        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
305    }
306
307    pub fn table_id(&self) -> TableId {
308        self.table_id
309    }
310
311    pub fn add_imm(&mut self, imm: ImmutableMemtable) {
312        if let Some(item) = self
313            .staging
314            .pending_imms
315            .last()
316            .or_else(|| self.staging.uploading_imms.front())
317        {
318            // check batch_id order from newest to old
319            debug_assert!(item.batch_id() < imm.batch_id());
320        }
321
322        self.staging.pending_imm_size += imm.size();
323        self.staging.pending_imms.push(imm);
324    }
325
326    pub fn pending_imm_size(&self) -> usize {
327        self.staging.pending_imm_size
328    }
329
330    pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
331        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
332        for imm in &pending_imms {
333            self.staging.uploading_imms.push_front(imm.clone());
334        }
335        self.staging.pending_imm_size = 0;
336        pending_imms
337    }
338
339    /// Updates the read version with `VersionUpdate`.
340    /// There will be three data types to be processed
341    /// `VersionUpdate::Staging`
342    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
343    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
344    ///       corresponding `staging_imms` according to the `batch_id`
345    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
346    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
347    /// `staging_sst` and `staging_imm` in memory according to epoch
348    pub fn update(&mut self, info: VersionUpdate) {
349        match info {
350            VersionUpdate::Sst(staging_sst_ref) => {
351                {
352                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
353                        warn!(
354                            instance_id = self.instance_id,
355                            "no related imm in sst input"
356                        );
357                        return;
358                    };
359
360                    // old data comes first
361                    for imm_id in imms.iter().rev() {
362                        let check_err = match self.staging.uploading_imms.pop_back() {
363                            None => Some("empty".to_owned()),
364                            Some(prev_imm_id) => {
365                                if prev_imm_id.batch_id() == *imm_id {
366                                    None
367                                } else {
368                                    Some(format!(
369                                        "miss match id {} {}",
370                                        prev_imm_id.batch_id(),
371                                        *imm_id
372                                    ))
373                                }
374                            }
375                        };
376                        assert!(
377                            check_err.is_none(),
378                            "should be valid staging_sst.size {},
379                                    staging_sst.imm_ids {:?},
380                                    staging_sst.epochs {:?},
381                                    local_pending_imm_ids {:?},
382                                    local_uploading_imm_ids {:?},
383                                    instance_id {}
384                                    check_err {:?}",
385                            staging_sst_ref.imm_size,
386                            staging_sst_ref.imm_ids,
387                            staging_sst_ref.epochs,
388                            self.staging
389                                .pending_imms
390                                .iter()
391                                .map(|imm| imm.batch_id())
392                                .collect_vec(),
393                            self.staging
394                                .uploading_imms
395                                .iter()
396                                .map(|imm| imm.batch_id())
397                                .collect_vec(),
398                            self.instance_id,
399                            check_err
400                        );
401                    }
402
403                    self.staging.sst.push_front(staging_sst_ref);
404                }
405            }
406
407            VersionUpdate::CommittedSnapshot(committed_version) => {
408                if let Some(info) = committed_version
409                    .state_table_info
410                    .info()
411                    .get(&self.table_id)
412                {
413                    let committed_epoch = info.committed_epoch;
414                    if self.is_replicated {
415                        self.staging
416                            .uploading_imms
417                            .retain(|imm| imm.min_epoch() > committed_epoch);
418                        self.staging
419                            .pending_imms
420                            .retain(|imm| imm.min_epoch() > committed_epoch);
421                    } else {
422                        self.staging
423                            .pending_imms
424                            .iter()
425                            .chain(self.staging.uploading_imms.iter())
426                            .for_each(|imm| {
427                                assert!(
428                                    imm.min_epoch() > committed_epoch,
429                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
430                                    imm.table_id,
431                                    imm.min_epoch(),
432                                    committed_epoch
433                                )
434                            });
435                    }
436
437                    self.staging.sst.retain(|sst| {
438                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
439                    });
440
441                    // check epochs.last() > MCE
442                    assert!(self.staging.sst.iter().all(|sst| {
443                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
444                    }));
445
446                    if let Some(committed_watermarks) =
447                        committed_version.table_watermarks.get(&self.table_id)
448                        && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
449                    {
450                        if let Some(watermark_index) = &mut self.table_watermarks {
451                            watermark_index.apply_committed_watermarks(
452                                committed_watermarks.clone(),
453                                committed_epoch,
454                            );
455                        } else {
456                            self.table_watermarks =
457                                Some(PkPrefixTableWatermarksIndex::new_committed(
458                                    committed_watermarks.clone(),
459                                    committed_epoch,
460                                ));
461                        }
462                    }
463                }
464
465                self.committed = committed_version;
466            }
467            VersionUpdate::NewTableWatermark {
468                direction,
469                epoch,
470                vnode_watermarks,
471                watermark_type,
472            } => {
473                assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
474                if let Some(watermark_index) = &mut self.table_watermarks {
475                    watermark_index.add_epoch_watermark(
476                        epoch,
477                        Arc::from(vnode_watermarks),
478                        direction,
479                    );
480                } else {
481                    self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
482                        direction,
483                        epoch,
484                        vnode_watermarks,
485                        self.committed.table_committed_epoch(self.table_id),
486                    ));
487                }
488            }
489        }
490    }
491
492    pub fn staging(&self) -> &StagingVersion {
493        &self.staging
494    }
495
496    pub fn committed(&self) -> &CommittedVersion {
497        &self.committed
498    }
499
500    /// We have assumption that the watermark is increasing monotonically. Therefore,
501    /// here if the upper layer usage has passed an regressed watermark, we should
502    /// filter out the regressed watermark. Currently the kv log store may write
503    /// regressed watermark
504    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
505        if let Some(watermark_index) = &self.table_watermarks {
506            watermark_index.filter_regress_watermarks(watermarks)
507        }
508    }
509
510    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
511        self.table_watermarks
512            .as_ref()
513            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
514    }
515
516    pub fn is_replicated(&self) -> bool {
517        self.is_replicated
518    }
519
520    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
521        std::mem::replace(&mut self.vnodes, vnodes)
522    }
523
524    pub fn contains(&self, vnode: VirtualNode) -> bool {
525        self.vnodes.is_set(vnode.to_index())
526    }
527
528    pub fn vnodes(&self) -> Arc<Bitmap> {
529        self.vnodes.clone()
530    }
531}
532
533pub fn read_filter_for_version(
534    epoch: HummockEpoch,
535    table_id: TableId,
536    mut table_key_range: TableKeyRange,
537    read_version: &RwLock<HummockReadVersion>,
538) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
539    let read_version_guard = read_version.read();
540
541    let committed_version = read_version_guard.committed().clone();
542
543    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
544        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
545    }
546
547    let (imm_iter, sst_iter) =
548        read_version_guard
549            .staging()
550            .prune_overlap(epoch, table_id, &table_key_range);
551
552    let imms = imm_iter.cloned().collect();
553    let ssts = sst_iter.cloned().collect();
554
555    Ok((table_key_range, (imms, ssts, committed_version)))
556}
557
558#[derive(Clone)]
559pub struct HummockVersionReader {
560    sstable_store: SstableStoreRef,
561
562    /// Statistics
563    state_store_metrics: Arc<HummockStateStoreMetrics>,
564    preload_retry_times: usize,
565}
566
567/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
568/// `streaming_query`
569impl HummockVersionReader {
570    pub fn new(
571        sstable_store: SstableStoreRef,
572        state_store_metrics: Arc<HummockStateStoreMetrics>,
573        preload_retry_times: usize,
574    ) -> Self {
575        Self {
576            sstable_store,
577            state_store_metrics,
578            preload_retry_times,
579        }
580    }
581
582    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
583        &self.state_store_metrics
584    }
585}
586
587const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
588
589impl HummockVersionReader {
590    pub async fn get<O>(
591        &self,
592        table_key: TableKey<Bytes>,
593        epoch: u64,
594        table_id: TableId,
595        read_options: ReadOptions,
596        read_version_tuple: ReadVersionTuple,
597        on_key_value_fn: impl crate::store::KeyValueFn<O>,
598    ) -> StorageResult<Option<O>> {
599        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
600
601        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
602        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
603        let local_stats = &mut stats_guard.local_stats;
604        local_stats.found_key = true;
605
606        // 1. read staging data
607        for imm in &imms {
608            // skip imm that only holding out-of-date data
609            if imm.max_epoch() < min_epoch {
610                continue;
611            }
612
613            local_stats.staging_imm_get_count += 1;
614
615            if let Some((data, data_epoch)) = get_from_batch(
616                imm,
617                TableKey(table_key.as_ref()),
618                epoch,
619                &read_options,
620                local_stats,
621            ) {
622                return Ok(if data_epoch.pure_epoch() < min_epoch {
623                    None
624                } else {
625                    data.into_user_value()
626                        .map(|v| {
627                            on_key_value_fn(
628                                FullKey::new_with_gap_epoch(
629                                    table_id,
630                                    table_key.to_ref(),
631                                    data_epoch,
632                                ),
633                                v.as_ref(),
634                            )
635                        })
636                        .transpose()?
637                });
638            }
639        }
640
641        // 2. order guarantee: imm -> sst
642        let dist_key_hash = read_options
643            .prefix_hint
644            .as_ref()
645            .map(|dist_key| Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.table_id()));
646
647        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
648        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
649        let full_key = FullKey::new_with_gap_epoch(
650            table_id,
651            TableKey(table_key.clone()),
652            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
653        );
654        for local_sst in &uncommitted_ssts {
655            local_stats.staging_sst_get_count += 1;
656            if let Some(iter) = get_from_sstable_info(
657                self.sstable_store.clone(),
658                local_sst,
659                full_key.to_ref(),
660                &read_options,
661                dist_key_hash,
662                local_stats,
663            )
664            .await?
665            {
666                debug_assert!(iter.is_valid());
667                let data_epoch = iter.key().epoch_with_gap;
668                return Ok(if data_epoch.pure_epoch() < min_epoch {
669                    None
670                } else {
671                    iter.value()
672                        .into_user_value()
673                        .map(|v| {
674                            on_key_value_fn(
675                                FullKey::new_with_gap_epoch(
676                                    table_id,
677                                    table_key.to_ref(),
678                                    data_epoch,
679                                ),
680                                v,
681                            )
682                        })
683                        .transpose()?
684                });
685            }
686        }
687        let single_table_key_range = table_key.clone()..=table_key.clone();
688        // 3. read from committed_version sst file
689        // Because SST meta records encoded key range,
690        // the filter key needs to be encoded as well.
691        assert!(committed_version.is_valid());
692        for level in committed_version.levels(table_id) {
693            if level.table_infos.is_empty() {
694                continue;
695            }
696
697            match level.level_type {
698                LevelType::Overlapping | LevelType::Unspecified => {
699                    let sstable_infos = prune_overlapping_ssts(
700                        &level.table_infos,
701                        table_id,
702                        &single_table_key_range,
703                    );
704                    for sstable_info in sstable_infos {
705                        local_stats.overlapping_get_count += 1;
706                        if let Some(iter) = get_from_sstable_info(
707                            self.sstable_store.clone(),
708                            sstable_info,
709                            full_key.to_ref(),
710                            &read_options,
711                            dist_key_hash,
712                            local_stats,
713                        )
714                        .await?
715                        {
716                            debug_assert!(iter.is_valid());
717                            let data_epoch = iter.key().epoch_with_gap;
718                            return Ok(if data_epoch.pure_epoch() < min_epoch {
719                                None
720                            } else {
721                                iter.value()
722                                    .into_user_value()
723                                    .map(|v| {
724                                        on_key_value_fn(
725                                            FullKey::new_with_gap_epoch(
726                                                table_id,
727                                                table_key.to_ref(),
728                                                data_epoch,
729                                            ),
730                                            v,
731                                        )
732                                    })
733                                    .transpose()?
734                            });
735                        }
736                    }
737                }
738                LevelType::Nonoverlapping => {
739                    let mut table_info_idx =
740                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
741                    if table_info_idx == 0 {
742                        continue;
743                    }
744                    table_info_idx = table_info_idx.saturating_sub(1);
745                    let ord = level.table_infos[table_info_idx]
746                        .key_range
747                        .compare_right_with_user_key(full_key.user_key.as_ref());
748                    // the case that the key falls into the gap between two ssts
749                    if ord == Ordering::Less {
750                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
751                        continue;
752                    }
753
754                    local_stats.non_overlapping_get_count += 1;
755                    if let Some(iter) = get_from_sstable_info(
756                        self.sstable_store.clone(),
757                        &level.table_infos[table_info_idx],
758                        full_key.to_ref(),
759                        &read_options,
760                        dist_key_hash,
761                        local_stats,
762                    )
763                    .await?
764                    {
765                        debug_assert!(iter.is_valid());
766                        let data_epoch = iter.key().epoch_with_gap;
767                        return Ok(if data_epoch.pure_epoch() < min_epoch {
768                            None
769                        } else {
770                            iter.value()
771                                .into_user_value()
772                                .map(|v| {
773                                    on_key_value_fn(
774                                        FullKey::new_with_gap_epoch(
775                                            table_id,
776                                            table_key.to_ref(),
777                                            data_epoch,
778                                        ),
779                                        v,
780                                    )
781                                })
782                                .transpose()?
783                        });
784                    }
785                }
786            }
787        }
788        stats_guard.local_stats.found_key = false;
789        Ok(None)
790    }
791
792    pub async fn iter(
793        &self,
794        table_key_range: TableKeyRange,
795        epoch: u64,
796        table_id: TableId,
797        read_options: ReadOptions,
798        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
799    ) -> StorageResult<HummockStorageIterator> {
800        self.iter_with_memtable(
801            table_key_range,
802            epoch,
803            table_id,
804            read_options,
805            read_version_tuple,
806            None,
807        )
808        .await
809    }
810
811    pub async fn iter_with_memtable<'b>(
812        &self,
813        table_key_range: TableKeyRange,
814        epoch: u64,
815        table_id: TableId,
816        read_options: ReadOptions,
817        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
818        memtable_iter: Option<MemTableHummockIterator<'b>>,
819    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
820        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
821        let user_key_range = (
822            user_key_range_ref.0.map(|key| key.cloned()),
823            user_key_range_ref.1.map(|key| key.cloned()),
824        );
825        let mut factory = ForwardIteratorFactory::default();
826        let mut local_stats = StoreLocalStatistic::default();
827        let (imms, uncommitted_ssts, committed) = read_version_tuple;
828        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
829        self.iter_inner(
830            table_key_range,
831            epoch,
832            table_id,
833            read_options,
834            imms,
835            uncommitted_ssts,
836            &committed,
837            &mut local_stats,
838            &mut factory,
839        )
840        .await?;
841        let merge_iter = factory.build(memtable_iter);
842        // the epoch_range left bound for iterator read
843        let mut user_iter = UserIterator::new(
844            merge_iter,
845            user_key_range,
846            epoch,
847            min_epoch,
848            Some(committed),
849        );
850        user_iter.rewind().await?;
851        Ok(HummockStorageIteratorInner::new(
852            user_iter,
853            self.state_store_metrics.clone(),
854            table_id,
855            local_stats,
856        ))
857    }
858
859    pub async fn rev_iter<'b>(
860        &self,
861        table_key_range: TableKeyRange,
862        epoch: u64,
863        table_id: TableId,
864        read_options: ReadOptions,
865        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
866        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
867    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
868        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
869        let user_key_range = (
870            user_key_range_ref.0.map(|key| key.cloned()),
871            user_key_range_ref.1.map(|key| key.cloned()),
872        );
873        let mut factory = BackwardIteratorFactory::default();
874        let mut local_stats = StoreLocalStatistic::default();
875        let (imms, uncommitted_ssts, committed) = read_version_tuple;
876        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
877        self.iter_inner(
878            table_key_range,
879            epoch,
880            table_id,
881            read_options,
882            imms,
883            uncommitted_ssts,
884            &committed,
885            &mut local_stats,
886            &mut factory,
887        )
888        .await?;
889        let merge_iter = factory.build(memtable_iter);
890        // the epoch_range left bound for iterator read
891        let mut user_iter = BackwardUserIterator::new(
892            merge_iter,
893            user_key_range,
894            epoch,
895            min_epoch,
896            Some(committed),
897        );
898        user_iter.rewind().await?;
899        Ok(HummockStorageRevIteratorInner::new(
900            user_iter,
901            self.state_store_metrics.clone(),
902            table_id,
903            local_stats,
904        ))
905    }
906
907    async fn iter_inner<F: IteratorFactory>(
908        &self,
909        table_key_range: TableKeyRange,
910        epoch: u64,
911        table_id: TableId,
912        read_options: ReadOptions,
913        imms: Vec<ImmutableMemtable>,
914        uncommitted_ssts: Vec<SstableInfo>,
915        committed: &CommittedVersion,
916        local_stats: &mut StoreLocalStatistic,
917        factory: &mut F,
918    ) -> StorageResult<()> {
919        local_stats.staging_imm_iter_count = imms.len() as u64;
920        for imm in imms {
921            factory.add_batch_iter(imm);
922        }
923
924        // 2. build iterator from committed
925        // Because SST meta records encoded key range,
926        // the filter key range needs to be encoded as well.
927        let user_key_range = bound_table_key_range(table_id, &table_key_range);
928        let user_key_range_ref = (
929            user_key_range.0.as_ref().map(UserKey::as_ref),
930            user_key_range.1.as_ref().map(UserKey::as_ref),
931        );
932        let mut staging_sst_iter_count = 0;
933        // encode once
934        let bloom_filter_prefix_hash = read_options
935            .prefix_hint
936            .as_ref()
937            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.table_id()));
938        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
939        if read_options.prefetch_options.prefetch {
940            sst_read_options.must_iterated_end_user_key =
941                Some(user_key_range.1.map(|key| key.cloned()));
942            sst_read_options.max_preload_retry_times = self.preload_retry_times;
943        }
944        let sst_read_options = Arc::new(sst_read_options);
945        for sstable_info in &uncommitted_ssts {
946            let table_holder = self
947                .sstable_store
948                .sstable(sstable_info, local_stats)
949                .await?;
950
951            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
952                && !hit_sstable_bloom_filter(
953                    &table_holder,
954                    &user_key_range_ref,
955                    *prefix_hash,
956                    local_stats,
957                )
958            {
959                continue;
960            }
961
962            staging_sst_iter_count += 1;
963            factory.add_staging_sst_iter(F::SstableIteratorType::create(
964                table_holder,
965                self.sstable_store.clone(),
966                sst_read_options.clone(),
967                sstable_info,
968            ));
969        }
970        local_stats.staging_sst_iter_count = staging_sst_iter_count;
971
972        let timer = Instant::now();
973
974        for level in committed.levels(table_id) {
975            if level.table_infos.is_empty() {
976                continue;
977            }
978
979            if level.level_type == LevelType::Nonoverlapping {
980                let mut table_infos = prune_nonoverlapping_ssts(
981                    &level.table_infos,
982                    user_key_range_ref,
983                    table_id.table_id(),
984                )
985                .peekable();
986
987                if table_infos.peek().is_none() {
988                    continue;
989                }
990                let sstable_infos = table_infos.cloned().collect_vec();
991                if sstable_infos.len() > 1 {
992                    factory.add_concat_sst_iter(
993                        sstable_infos,
994                        self.sstable_store.clone(),
995                        sst_read_options.clone(),
996                    );
997                    local_stats.non_overlapping_iter_count += 1;
998                } else {
999                    let sstable = self
1000                        .sstable_store
1001                        .sstable(&sstable_infos[0], local_stats)
1002                        .await?;
1003
1004                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1005                        && !hit_sstable_bloom_filter(
1006                            &sstable,
1007                            &user_key_range_ref,
1008                            *dist_hash,
1009                            local_stats,
1010                        )
1011                    {
1012                        continue;
1013                    }
1014                    // Since there is only one sst to be included for the current non-overlapping
1015                    // level, there is no need to create a ConcatIterator on it.
1016                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1017                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1018                    // it in `non_overlapping_iter_count`.
1019                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1020                        sstable,
1021                        self.sstable_store.clone(),
1022                        sst_read_options.clone(),
1023                        &sstable_infos[0],
1024                    ));
1025                    local_stats.non_overlapping_iter_count += 1;
1026                }
1027            } else {
1028                let table_infos =
1029                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1030                // Overlapping
1031                let fetch_meta_req = table_infos.rev().collect_vec();
1032                if fetch_meta_req.is_empty() {
1033                    continue;
1034                }
1035                for sstable_info in fetch_meta_req {
1036                    let sstable = self
1037                        .sstable_store
1038                        .sstable(sstable_info, local_stats)
1039                        .await?;
1040                    assert_eq!(sstable_info.object_id, sstable.id);
1041                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1042                        && !hit_sstable_bloom_filter(
1043                            &sstable,
1044                            &user_key_range_ref,
1045                            *dist_hash,
1046                            local_stats,
1047                        )
1048                    {
1049                        continue;
1050                    }
1051                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1052                        sstable,
1053                        self.sstable_store.clone(),
1054                        sst_read_options.clone(),
1055                        sstable_info,
1056                    ));
1057                    local_stats.overlapping_iter_count += 1;
1058                }
1059            }
1060        }
1061        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1062        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1063            let table_id_string = table_id.to_string();
1064            tracing::warn!(
1065                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1066                table_id_string,
1067                epoch,
1068                fetch_meta_duration_sec,
1069                local_stats.cache_meta_block_miss
1070            );
1071            self.state_store_metrics
1072                .iter_slow_fetch_meta_cache_unhits
1073                .set(local_stats.cache_meta_block_miss as i64);
1074        }
1075        Ok(())
1076    }
1077
1078    pub async fn iter_log(
1079        &self,
1080        version: PinnedVersion,
1081        epoch_range: (u64, u64),
1082        key_range: TableKeyRange,
1083        options: ReadLogOptions,
1084    ) -> HummockResult<ChangeLogIterator> {
1085        let change_log = {
1086            let table_change_logs = version.table_change_log_read_lock();
1087            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1088                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1089            } else {
1090                Vec::new()
1091            }
1092        };
1093
1094        if let Some(max_epoch_change_log) = change_log.last() {
1095            let (_, max_epoch) = epoch_range;
1096            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1097                warn!(
1098                    max_epoch,
1099                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1100                    table_id = options.table_id.table_id,
1101                    "max_epoch does not exist"
1102                );
1103            }
1104        }
1105        let read_options = Arc::new(SstableIteratorReadOptions {
1106            cache_policy: Default::default(),
1107            must_iterated_end_user_key: None,
1108            max_preload_retry_times: 0,
1109            prefetch_for_large_query: false,
1110        });
1111
1112        async fn make_iter(
1113            sstable_infos: impl Iterator<Item = &SstableInfo>,
1114            sstable_store: &SstableStoreRef,
1115            read_options: Arc<SstableIteratorReadOptions>,
1116            local_stat: &mut StoreLocalStatistic,
1117        ) -> HummockResult<MergeIterator<SstableIterator>> {
1118            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1119                let sstable_store = sstable_store.clone();
1120                let read_options = read_options.clone();
1121                async move {
1122                    let mut local_stat = StoreLocalStatistic::default();
1123                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1124                    Ok::<_, HummockError>((
1125                        SstableIterator::new(
1126                            table_holder,
1127                            sstable_store,
1128                            read_options,
1129                            sstable_info,
1130                        ),
1131                        local_stat,
1132                    ))
1133                }
1134            }))
1135            .await?;
1136            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1137                |(iter, stats)| {
1138                    local_stat.add(&stats);
1139                    iter
1140                },
1141            )))
1142        }
1143
1144        let mut local_stat = StoreLocalStatistic::default();
1145
1146        let new_value_iter = make_iter(
1147            change_log
1148                .iter()
1149                .flat_map(|log| log.new_value.iter())
1150                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1151            &self.sstable_store,
1152            read_options.clone(),
1153            &mut local_stat,
1154        )
1155        .await?;
1156        let old_value_iter = make_iter(
1157            change_log
1158                .iter()
1159                .flat_map(|log| log.old_value.iter())
1160                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1161            &self.sstable_store,
1162            read_options.clone(),
1163            &mut local_stat,
1164        )
1165        .await?;
1166        ChangeLogIterator::new(
1167            epoch_range,
1168            key_range,
1169            new_value_iter,
1170            old_value_iter,
1171            options.table_id,
1172            IterLocalMetricsGuard::new(
1173                self.state_store_metrics.clone(),
1174                options.table_id,
1175                local_stat,
1176            ),
1177        )
1178        .await
1179    }
1180
1181    pub async fn nearest<M: MeasureDistanceBuilder, O: Send>(
1182        &self,
1183        version: PinnedVersion,
1184        table_id: TableId,
1185        target: Vector,
1186        options: VectorNearestOptions,
1187        on_nearest_item_fn: impl OnNearestItemFn<O>,
1188    ) -> HummockResult<Vec<O>> {
1189        let Some(index) = version.vector_indexes.get(&table_id) else {
1190            return Ok(vec![]);
1191        };
1192        if target.dimension() != index.dimension {
1193            return Err(HummockError::other(format!(
1194                "target dimension {} not match index dimension {}",
1195                target.dimension(),
1196                index.dimension
1197            )));
1198        }
1199        match &index.inner {
1200            VectorIndexImpl::Flat(flat) => {
1201                let mut builder = NearestBuilder::<'_, O, M>::new(target.to_ref(), options.top_n);
1202                for vector_file in &flat.vector_store_info.vector_files {
1203                    let meta = self.sstable_store.get_vector_file_meta(vector_file).await?;
1204                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1205                        let block = self
1206                            .sstable_store
1207                            .get_vector_block(vector_file, i, block_meta)
1208                            .await?;
1209                        builder.add(&**block, &on_nearest_item_fn);
1210                    }
1211                }
1212                Ok(builder.finish())
1213            }
1214        }
1215    }
1216}