risingwave_storage/hummock/store/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36    PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54    filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55    search_sst_idx,
56};
57use crate::hummock::vector::file::FileVectorStore;
58use crate::hummock::{
59    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
60    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
61    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
62    hit_sstable_bloom_filter,
63};
64use crate::mem_table::{
65    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
66};
67use crate::monitor::{
68    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
69};
70use crate::store::{
71    OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
72};
73use crate::vector::hnsw::nearest;
74use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
75
76pub type CommittedVersion = PinnedVersion;
77
78/// Data not committed to Hummock. There are two types of staging data:
79/// - Immutable memtable: data that has been written into local state store but not persisted.
80/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
81///   hummock version.
82
83#[derive(Clone, Debug, PartialEq)]
84pub struct StagingSstableInfo {
85    // newer data comes first
86    sstable_infos: Vec<LocalSstableInfo>,
87    old_value_sstable_infos: Vec<LocalSstableInfo>,
88    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
89    /// The field must not be empty.
90    epochs: Vec<HummockEpoch>,
91    // newer data at the front
92    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
93    imm_size: usize,
94}
95
96impl StagingSstableInfo {
97    pub fn new(
98        sstable_infos: Vec<LocalSstableInfo>,
99        old_value_sstable_infos: Vec<LocalSstableInfo>,
100        epochs: Vec<HummockEpoch>,
101        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
102        imm_size: usize,
103    ) -> Self {
104        // the epochs are sorted from higher epoch to lower epoch
105        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
106        Self {
107            sstable_infos,
108            old_value_sstable_infos,
109            epochs,
110            imm_ids,
111            imm_size,
112        }
113    }
114
115    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
116        &self.sstable_infos
117    }
118
119    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
120        &self.old_value_sstable_infos
121    }
122
123    pub fn imm_size(&self) -> usize {
124        self.imm_size
125    }
126
127    pub fn epochs(&self) -> &Vec<HummockEpoch> {
128        &self.epochs
129    }
130
131    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
132        &self.imm_ids
133    }
134}
135
136pub enum VersionUpdate {
137    Sst(Arc<StagingSstableInfo>),
138    CommittedSnapshot(CommittedVersion),
139    NewTableWatermark {
140        direction: WatermarkDirection,
141        epoch: HummockEpoch,
142        vnode_watermarks: Vec<VnodeWatermark>,
143        watermark_type: WatermarkSerdeType,
144    },
145}
146
147#[derive(Clone)]
148pub struct StagingVersion {
149    pending_imm_size: usize,
150    /// It contains the imms added but not sent to the uploader of hummock event handler.
151    /// It is non-empty only when `upload_on_flush` is false.
152    ///
153    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
154    ///
155    /// newer data comes last
156    pub pending_imms: Vec<ImmutableMemtable>,
157    /// It contains the imms already sent to uploader of hummock event handler.
158    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
159    /// imm of smaller batch id may be added later than one with greater batch id
160    ///
161    /// Newer data comes first.
162    pub uploading_imms: VecDeque<ImmutableMemtable>,
163
164    // newer data comes first
165    pub sst: VecDeque<Arc<StagingSstableInfo>>,
166}
167
168impl StagingVersion {
169    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
170    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
171    pub fn prune_overlap<'a>(
172        &'a self,
173        max_epoch_inclusive: HummockEpoch,
174        table_id: TableId,
175        table_key_range: &'a TableKeyRange,
176    ) -> (
177        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
178        impl Iterator<Item = &'a SstableInfo> + 'a,
179    ) {
180        let (left, right) = table_key_range;
181        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
182        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
183        let overlapped_imms = self
184            .pending_imms
185            .iter()
186            .rev() // rev to let newer imm come first
187            .chain(self.uploading_imms.iter())
188            .filter(move |imm| {
189                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
190                imm.min_epoch() <= max_epoch_inclusive
191                    && imm.table_id == table_id
192                    && range_overlap(
193                        &(left, right),
194                        &imm.start_table_key(),
195                        Bound::Included(&imm.end_table_key()),
196                    )
197            });
198
199        // TODO: Remove duplicate sst based on sst id
200        let overlapped_ssts = self
201            .sst
202            .iter()
203            .filter(move |staging_sst| {
204                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
205                sst_max_epoch <= max_epoch_inclusive
206            })
207            .flat_map(move |staging_sst| {
208                // TODO: sstable info should be concat-able after each streaming table owns a read
209                // version. May use concat sstable iter instead in some cases.
210                staging_sst
211                    .sstable_infos
212                    .iter()
213                    .map(|sstable| &sstable.sst_info)
214                    .filter(move |sstable: &&SstableInfo| {
215                        filter_single_sst(sstable, table_id, table_key_range)
216                    })
217            });
218        (overlapped_imms, overlapped_ssts)
219    }
220
221    pub fn is_empty(&self) -> bool {
222        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
223    }
224}
225
226#[derive(Clone)]
227/// A container of information required for reading from hummock.
228pub struct HummockReadVersion {
229    table_id: TableId,
230    instance_id: LocalInstanceId,
231
232    /// Local version for staging data.
233    staging: StagingVersion,
234
235    /// Remote version for committed data.
236    committed: CommittedVersion,
237
238    /// Indicate if this is replicated. If it is, we should ignore it during
239    /// global state store read, to avoid duplicated results.
240    /// Otherwise for local state store, it is fine, see we will see the
241    /// `ReadVersion` just for that local state store.
242    is_replicated: bool,
243
244    table_watermarks: Option<PkPrefixTableWatermarksIndex>,
245
246    // Vnode bitmap corresponding to the read version
247    // It will be initialized after local state store init
248    vnodes: Arc<Bitmap>,
249}
250
251impl HummockReadVersion {
252    pub fn new_with_replication_option(
253        table_id: TableId,
254        instance_id: LocalInstanceId,
255        committed_version: CommittedVersion,
256        is_replicated: bool,
257        vnodes: Arc<Bitmap>,
258    ) -> Self {
259        // before build `HummockReadVersion`, we need to get the a initial version which obtained
260        // from meta. want this initialization after version is initialized (now with
261        // notification), so add a assert condition to guarantee correct initialization order
262        assert!(committed_version.is_valid());
263        Self {
264            table_id,
265            instance_id,
266            table_watermarks: {
267                match committed_version.table_watermarks.get(&table_id) {
268                    Some(table_watermarks) => match table_watermarks.watermark_type {
269                        WatermarkSerdeType::PkPrefix => {
270                            Some(PkPrefixTableWatermarksIndex::new_committed(
271                                table_watermarks.clone(),
272                                committed_version
273                                    .state_table_info
274                                    .info()
275                                    .get(&table_id)
276                                    .expect("should exist")
277                                    .committed_epoch,
278                            ))
279                        }
280
281                        WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */
282                    },
283                    None => None,
284                }
285            },
286            staging: StagingVersion {
287                pending_imm_size: 0,
288                pending_imms: Vec::default(),
289                uploading_imms: VecDeque::default(),
290                sst: VecDeque::default(),
291            },
292
293            committed: committed_version,
294
295            is_replicated,
296            vnodes,
297        }
298    }
299
300    pub fn new(
301        table_id: TableId,
302        instance_id: LocalInstanceId,
303        committed_version: CommittedVersion,
304        vnodes: Arc<Bitmap>,
305    ) -> Self {
306        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
307    }
308
309    pub fn table_id(&self) -> TableId {
310        self.table_id
311    }
312
313    pub fn add_imm(&mut self, imm: ImmutableMemtable) {
314        if let Some(item) = self
315            .staging
316            .pending_imms
317            .last()
318            .or_else(|| self.staging.uploading_imms.front())
319        {
320            // check batch_id order from newest to old
321            debug_assert!(item.batch_id() < imm.batch_id());
322        }
323
324        self.staging.pending_imm_size += imm.size();
325        self.staging.pending_imms.push(imm);
326    }
327
328    pub fn pending_imm_size(&self) -> usize {
329        self.staging.pending_imm_size
330    }
331
332    pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
333        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
334        for imm in &pending_imms {
335            self.staging.uploading_imms.push_front(imm.clone());
336        }
337        self.staging.pending_imm_size = 0;
338        pending_imms
339    }
340
341    /// Updates the read version with `VersionUpdate`.
342    /// There will be three data types to be processed
343    /// `VersionUpdate::Staging`
344    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
345    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
346    ///       corresponding `staging_imms` according to the `batch_id`
347    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
348    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
349    /// `staging_sst` and `staging_imm` in memory according to epoch
350    pub fn update(&mut self, info: VersionUpdate) {
351        match info {
352            VersionUpdate::Sst(staging_sst_ref) => {
353                {
354                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
355                        warn!(
356                            instance_id = self.instance_id,
357                            "no related imm in sst input"
358                        );
359                        return;
360                    };
361
362                    // old data comes first
363                    for imm_id in imms.iter().rev() {
364                        let check_err = match self.staging.uploading_imms.pop_back() {
365                            None => Some("empty".to_owned()),
366                            Some(prev_imm_id) => {
367                                if prev_imm_id.batch_id() == *imm_id {
368                                    None
369                                } else {
370                                    Some(format!(
371                                        "miss match id {} {}",
372                                        prev_imm_id.batch_id(),
373                                        *imm_id
374                                    ))
375                                }
376                            }
377                        };
378                        assert!(
379                            check_err.is_none(),
380                            "should be valid staging_sst.size {},
381                                    staging_sst.imm_ids {:?},
382                                    staging_sst.epochs {:?},
383                                    local_pending_imm_ids {:?},
384                                    local_uploading_imm_ids {:?},
385                                    instance_id {}
386                                    check_err {:?}",
387                            staging_sst_ref.imm_size,
388                            staging_sst_ref.imm_ids,
389                            staging_sst_ref.epochs,
390                            self.staging
391                                .pending_imms
392                                .iter()
393                                .map(|imm| imm.batch_id())
394                                .collect_vec(),
395                            self.staging
396                                .uploading_imms
397                                .iter()
398                                .map(|imm| imm.batch_id())
399                                .collect_vec(),
400                            self.instance_id,
401                            check_err
402                        );
403                    }
404
405                    self.staging.sst.push_front(staging_sst_ref);
406                }
407            }
408
409            VersionUpdate::CommittedSnapshot(committed_version) => {
410                if let Some(info) = committed_version
411                    .state_table_info
412                    .info()
413                    .get(&self.table_id)
414                {
415                    let committed_epoch = info.committed_epoch;
416                    if self.is_replicated {
417                        self.staging
418                            .uploading_imms
419                            .retain(|imm| imm.min_epoch() > committed_epoch);
420                        self.staging
421                            .pending_imms
422                            .retain(|imm| imm.min_epoch() > committed_epoch);
423                    } else {
424                        self.staging
425                            .pending_imms
426                            .iter()
427                            .chain(self.staging.uploading_imms.iter())
428                            .for_each(|imm| {
429                                assert!(
430                                    imm.min_epoch() > committed_epoch,
431                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
432                                    imm.table_id,
433                                    imm.min_epoch(),
434                                    committed_epoch
435                                )
436                            });
437                    }
438
439                    self.staging.sst.retain(|sst| {
440                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
441                    });
442
443                    // check epochs.last() > MCE
444                    assert!(self.staging.sst.iter().all(|sst| {
445                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
446                    }));
447
448                    if let Some(committed_watermarks) =
449                        committed_version.table_watermarks.get(&self.table_id)
450                        && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
451                    {
452                        if let Some(watermark_index) = &mut self.table_watermarks {
453                            watermark_index.apply_committed_watermarks(
454                                committed_watermarks.clone(),
455                                committed_epoch,
456                            );
457                        } else {
458                            self.table_watermarks =
459                                Some(PkPrefixTableWatermarksIndex::new_committed(
460                                    committed_watermarks.clone(),
461                                    committed_epoch,
462                                ));
463                        }
464                    }
465                }
466
467                self.committed = committed_version;
468            }
469            VersionUpdate::NewTableWatermark {
470                direction,
471                epoch,
472                vnode_watermarks,
473                watermark_type,
474            } => {
475                assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
476                if let Some(watermark_index) = &mut self.table_watermarks {
477                    watermark_index.add_epoch_watermark(
478                        epoch,
479                        Arc::from(vnode_watermarks),
480                        direction,
481                    );
482                } else {
483                    self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
484                        direction,
485                        epoch,
486                        vnode_watermarks,
487                        self.committed.table_committed_epoch(self.table_id),
488                    ));
489                }
490            }
491        }
492    }
493
494    pub fn staging(&self) -> &StagingVersion {
495        &self.staging
496    }
497
498    pub fn committed(&self) -> &CommittedVersion {
499        &self.committed
500    }
501
502    /// We have assumption that the watermark is increasing monotonically. Therefore,
503    /// here if the upper layer usage has passed an regressed watermark, we should
504    /// filter out the regressed watermark. Currently the kv log store may write
505    /// regressed watermark
506    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
507        if let Some(watermark_index) = &self.table_watermarks {
508            watermark_index.filter_regress_watermarks(watermarks)
509        }
510    }
511
512    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
513        self.table_watermarks
514            .as_ref()
515            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
516    }
517
518    pub fn is_replicated(&self) -> bool {
519        self.is_replicated
520    }
521
522    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
523        std::mem::replace(&mut self.vnodes, vnodes)
524    }
525
526    pub fn contains(&self, vnode: VirtualNode) -> bool {
527        self.vnodes.is_set(vnode.to_index())
528    }
529
530    pub fn vnodes(&self) -> Arc<Bitmap> {
531        self.vnodes.clone()
532    }
533}
534
535pub fn read_filter_for_version(
536    epoch: HummockEpoch,
537    table_id: TableId,
538    mut table_key_range: TableKeyRange,
539    read_version: &RwLock<HummockReadVersion>,
540) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
541    let read_version_guard = read_version.read();
542
543    let committed_version = read_version_guard.committed().clone();
544
545    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
546        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
547    }
548
549    let (imm_iter, sst_iter) =
550        read_version_guard
551            .staging()
552            .prune_overlap(epoch, table_id, &table_key_range);
553
554    let imms = imm_iter.cloned().collect();
555    let ssts = sst_iter.cloned().collect();
556
557    Ok((table_key_range, (imms, ssts, committed_version)))
558}
559
560#[derive(Clone)]
561pub struct HummockVersionReader {
562    sstable_store: SstableStoreRef,
563
564    /// Statistics
565    state_store_metrics: Arc<HummockStateStoreMetrics>,
566    preload_retry_times: usize,
567}
568
569/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
570/// `streaming_query`
571impl HummockVersionReader {
572    pub fn new(
573        sstable_store: SstableStoreRef,
574        state_store_metrics: Arc<HummockStateStoreMetrics>,
575        preload_retry_times: usize,
576    ) -> Self {
577        Self {
578            sstable_store,
579            state_store_metrics,
580            preload_retry_times,
581        }
582    }
583
584    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
585        &self.state_store_metrics
586    }
587}
588
589const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
590
591impl HummockVersionReader {
592    pub async fn get<'a, O>(
593        &'a self,
594        table_key: TableKey<Bytes>,
595        epoch: u64,
596        table_id: TableId,
597        read_options: ReadOptions,
598        read_version_tuple: ReadVersionTuple,
599        on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
600    ) -> StorageResult<Option<O>> {
601        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
602
603        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
604        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
605        let local_stats = &mut stats_guard.local_stats;
606        local_stats.found_key = true;
607
608        // 1. read staging data
609        for imm in &imms {
610            // skip imm that only holding out-of-date data
611            if imm.max_epoch() < min_epoch {
612                continue;
613            }
614
615            local_stats.staging_imm_get_count += 1;
616
617            if let Some((data, data_epoch)) = get_from_batch(
618                imm,
619                TableKey(table_key.as_ref()),
620                epoch,
621                &read_options,
622                local_stats,
623            ) {
624                return Ok(if data_epoch.pure_epoch() < min_epoch {
625                    None
626                } else {
627                    data.into_user_value()
628                        .map(|v| {
629                            on_key_value_fn(
630                                FullKey::new_with_gap_epoch(
631                                    table_id,
632                                    table_key.to_ref(),
633                                    data_epoch,
634                                ),
635                                v.as_ref(),
636                            )
637                        })
638                        .transpose()?
639                });
640            }
641        }
642
643        // 2. order guarantee: imm -> sst
644        let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
645            Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
646        });
647
648        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
649        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
650        let full_key = FullKey::new_with_gap_epoch(
651            table_id,
652            TableKey(table_key.clone()),
653            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
654        );
655        for local_sst in &uncommitted_ssts {
656            local_stats.staging_sst_get_count += 1;
657            if let Some(iter) = get_from_sstable_info(
658                self.sstable_store.clone(),
659                local_sst,
660                full_key.to_ref(),
661                &read_options,
662                dist_key_hash,
663                local_stats,
664            )
665            .await?
666            {
667                debug_assert!(iter.is_valid());
668                let data_epoch = iter.key().epoch_with_gap;
669                return Ok(if data_epoch.pure_epoch() < min_epoch {
670                    None
671                } else {
672                    iter.value()
673                        .into_user_value()
674                        .map(|v| {
675                            on_key_value_fn(
676                                FullKey::new_with_gap_epoch(
677                                    table_id,
678                                    table_key.to_ref(),
679                                    data_epoch,
680                                ),
681                                v,
682                            )
683                        })
684                        .transpose()?
685                });
686            }
687        }
688        let single_table_key_range = table_key.clone()..=table_key.clone();
689        // 3. read from committed_version sst file
690        // Because SST meta records encoded key range,
691        // the filter key needs to be encoded as well.
692        assert!(committed_version.is_valid());
693        for level in committed_version.levels(table_id) {
694            if level.table_infos.is_empty() {
695                continue;
696            }
697
698            match level.level_type {
699                LevelType::Overlapping | LevelType::Unspecified => {
700                    let sstable_infos = prune_overlapping_ssts(
701                        &level.table_infos,
702                        table_id,
703                        &single_table_key_range,
704                    );
705                    for sstable_info in sstable_infos {
706                        local_stats.overlapping_get_count += 1;
707                        if let Some(iter) = get_from_sstable_info(
708                            self.sstable_store.clone(),
709                            sstable_info,
710                            full_key.to_ref(),
711                            &read_options,
712                            dist_key_hash,
713                            local_stats,
714                        )
715                        .await?
716                        {
717                            debug_assert!(iter.is_valid());
718                            let data_epoch = iter.key().epoch_with_gap;
719                            return Ok(if data_epoch.pure_epoch() < min_epoch {
720                                None
721                            } else {
722                                iter.value()
723                                    .into_user_value()
724                                    .map(|v| {
725                                        on_key_value_fn(
726                                            FullKey::new_with_gap_epoch(
727                                                table_id,
728                                                table_key.to_ref(),
729                                                data_epoch,
730                                            ),
731                                            v,
732                                        )
733                                    })
734                                    .transpose()?
735                            });
736                        }
737                    }
738                }
739                LevelType::Nonoverlapping => {
740                    let mut table_info_idx =
741                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
742                    if table_info_idx == 0 {
743                        continue;
744                    }
745                    table_info_idx = table_info_idx.saturating_sub(1);
746                    let ord = level.table_infos[table_info_idx]
747                        .key_range
748                        .compare_right_with_user_key(full_key.user_key.as_ref());
749                    // the case that the key falls into the gap between two ssts
750                    if ord == Ordering::Less {
751                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
752                        continue;
753                    }
754
755                    local_stats.non_overlapping_get_count += 1;
756                    if let Some(iter) = get_from_sstable_info(
757                        self.sstable_store.clone(),
758                        &level.table_infos[table_info_idx],
759                        full_key.to_ref(),
760                        &read_options,
761                        dist_key_hash,
762                        local_stats,
763                    )
764                    .await?
765                    {
766                        debug_assert!(iter.is_valid());
767                        let data_epoch = iter.key().epoch_with_gap;
768                        return Ok(if data_epoch.pure_epoch() < min_epoch {
769                            None
770                        } else {
771                            iter.value()
772                                .into_user_value()
773                                .map(|v| {
774                                    on_key_value_fn(
775                                        FullKey::new_with_gap_epoch(
776                                            table_id,
777                                            table_key.to_ref(),
778                                            data_epoch,
779                                        ),
780                                        v,
781                                    )
782                                })
783                                .transpose()?
784                        });
785                    }
786                }
787            }
788        }
789        stats_guard.local_stats.found_key = false;
790        Ok(None)
791    }
792
793    pub async fn iter(
794        &self,
795        table_key_range: TableKeyRange,
796        epoch: u64,
797        table_id: TableId,
798        read_options: ReadOptions,
799        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
800    ) -> StorageResult<HummockStorageIterator> {
801        self.iter_with_memtable(
802            table_key_range,
803            epoch,
804            table_id,
805            read_options,
806            read_version_tuple,
807            None,
808        )
809        .await
810    }
811
812    pub async fn iter_with_memtable<'b>(
813        &self,
814        table_key_range: TableKeyRange,
815        epoch: u64,
816        table_id: TableId,
817        read_options: ReadOptions,
818        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
819        memtable_iter: Option<MemTableHummockIterator<'b>>,
820    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
821        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
822        let user_key_range = (
823            user_key_range_ref.0.map(|key| key.cloned()),
824            user_key_range_ref.1.map(|key| key.cloned()),
825        );
826        let mut factory = ForwardIteratorFactory::default();
827        let mut local_stats = StoreLocalStatistic::default();
828        let (imms, uncommitted_ssts, committed) = read_version_tuple;
829        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
830        self.iter_inner(
831            table_key_range,
832            epoch,
833            table_id,
834            read_options,
835            imms,
836            uncommitted_ssts,
837            &committed,
838            &mut local_stats,
839            &mut factory,
840        )
841        .await?;
842        let merge_iter = factory.build(memtable_iter);
843        // the epoch_range left bound for iterator read
844        let mut user_iter = UserIterator::new(
845            merge_iter,
846            user_key_range,
847            epoch,
848            min_epoch,
849            Some(committed),
850        );
851        user_iter.rewind().await?;
852        Ok(HummockStorageIteratorInner::new(
853            user_iter,
854            self.state_store_metrics.clone(),
855            table_id,
856            local_stats,
857        ))
858    }
859
860    pub async fn rev_iter<'b>(
861        &self,
862        table_key_range: TableKeyRange,
863        epoch: u64,
864        table_id: TableId,
865        read_options: ReadOptions,
866        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
867        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
868    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
869        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
870        let user_key_range = (
871            user_key_range_ref.0.map(|key| key.cloned()),
872            user_key_range_ref.1.map(|key| key.cloned()),
873        );
874        let mut factory = BackwardIteratorFactory::default();
875        let mut local_stats = StoreLocalStatistic::default();
876        let (imms, uncommitted_ssts, committed) = read_version_tuple;
877        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
878        self.iter_inner(
879            table_key_range,
880            epoch,
881            table_id,
882            read_options,
883            imms,
884            uncommitted_ssts,
885            &committed,
886            &mut local_stats,
887            &mut factory,
888        )
889        .await?;
890        let merge_iter = factory.build(memtable_iter);
891        // the epoch_range left bound for iterator read
892        let mut user_iter = BackwardUserIterator::new(
893            merge_iter,
894            user_key_range,
895            epoch,
896            min_epoch,
897            Some(committed),
898        );
899        user_iter.rewind().await?;
900        Ok(HummockStorageRevIteratorInner::new(
901            user_iter,
902            self.state_store_metrics.clone(),
903            table_id,
904            local_stats,
905        ))
906    }
907
908    async fn iter_inner<F: IteratorFactory>(
909        &self,
910        table_key_range: TableKeyRange,
911        epoch: u64,
912        table_id: TableId,
913        read_options: ReadOptions,
914        imms: Vec<ImmutableMemtable>,
915        uncommitted_ssts: Vec<SstableInfo>,
916        committed: &CommittedVersion,
917        local_stats: &mut StoreLocalStatistic,
918        factory: &mut F,
919    ) -> StorageResult<()> {
920        {
921            fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
922                match bound {
923                    Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
924                    Bound::Unbounded => None,
925                }
926            }
927            let (left, right) = &table_key_range;
928            if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
929                && right < left
930            {
931                if cfg!(debug_assertions) {
932                    panic!("invalid iter key range: {table_id} {left:?} {right:?}")
933                } else {
934                    return Err(HummockError::other(format!(
935                        "invalid iter key range: {table_id} {left:?} {right:?}"
936                    ))
937                    .into());
938                }
939            }
940        }
941
942        local_stats.staging_imm_iter_count = imms.len() as u64;
943        for imm in imms {
944            factory.add_batch_iter(imm);
945        }
946
947        // 2. build iterator from committed
948        // Because SST meta records encoded key range,
949        // the filter key range needs to be encoded as well.
950        let user_key_range = bound_table_key_range(table_id, &table_key_range);
951        let user_key_range_ref = (
952            user_key_range.0.as_ref().map(UserKey::as_ref),
953            user_key_range.1.as_ref().map(UserKey::as_ref),
954        );
955        let mut staging_sst_iter_count = 0;
956        // encode once
957        let bloom_filter_prefix_hash = read_options
958            .prefix_hint
959            .as_ref()
960            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
961        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
962        if read_options.prefetch_options.prefetch {
963            sst_read_options.must_iterated_end_user_key =
964                Some(user_key_range.1.map(|key| key.cloned()));
965            sst_read_options.max_preload_retry_times = self.preload_retry_times;
966        }
967        let sst_read_options = Arc::new(sst_read_options);
968        for sstable_info in &uncommitted_ssts {
969            let table_holder = self
970                .sstable_store
971                .sstable(sstable_info, local_stats)
972                .await?;
973
974            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
975                && !hit_sstable_bloom_filter(
976                    &table_holder,
977                    &user_key_range_ref,
978                    *prefix_hash,
979                    local_stats,
980                )
981            {
982                continue;
983            }
984
985            staging_sst_iter_count += 1;
986            factory.add_staging_sst_iter(F::SstableIteratorType::create(
987                table_holder,
988                self.sstable_store.clone(),
989                sst_read_options.clone(),
990                sstable_info,
991            ));
992        }
993        local_stats.staging_sst_iter_count = staging_sst_iter_count;
994
995        let timer = Instant::now();
996
997        for level in committed.levels(table_id) {
998            if level.table_infos.is_empty() {
999                continue;
1000            }
1001
1002            if level.level_type == LevelType::Nonoverlapping {
1003                let mut table_infos =
1004                    prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1005                        .peekable();
1006
1007                if table_infos.peek().is_none() {
1008                    continue;
1009                }
1010                let sstable_infos = table_infos.cloned().collect_vec();
1011                if sstable_infos.len() > 1 {
1012                    factory.add_concat_sst_iter(
1013                        sstable_infos,
1014                        self.sstable_store.clone(),
1015                        sst_read_options.clone(),
1016                    );
1017                    local_stats.non_overlapping_iter_count += 1;
1018                } else {
1019                    let sstable = self
1020                        .sstable_store
1021                        .sstable(&sstable_infos[0], local_stats)
1022                        .await?;
1023
1024                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1025                        && !hit_sstable_bloom_filter(
1026                            &sstable,
1027                            &user_key_range_ref,
1028                            *dist_hash,
1029                            local_stats,
1030                        )
1031                    {
1032                        continue;
1033                    }
1034                    // Since there is only one sst to be included for the current non-overlapping
1035                    // level, there is no need to create a ConcatIterator on it.
1036                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1037                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1038                    // it in `non_overlapping_iter_count`.
1039                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1040                        sstable,
1041                        self.sstable_store.clone(),
1042                        sst_read_options.clone(),
1043                        &sstable_infos[0],
1044                    ));
1045                    local_stats.non_overlapping_iter_count += 1;
1046                }
1047            } else {
1048                let table_infos =
1049                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1050                // Overlapping
1051                let fetch_meta_req = table_infos.rev().collect_vec();
1052                if fetch_meta_req.is_empty() {
1053                    continue;
1054                }
1055                for sstable_info in fetch_meta_req {
1056                    let sstable = self
1057                        .sstable_store
1058                        .sstable(sstable_info, local_stats)
1059                        .await?;
1060                    assert_eq!(sstable_info.object_id, sstable.id);
1061                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1062                        && !hit_sstable_bloom_filter(
1063                            &sstable,
1064                            &user_key_range_ref,
1065                            *dist_hash,
1066                            local_stats,
1067                        )
1068                    {
1069                        continue;
1070                    }
1071                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1072                        sstable,
1073                        self.sstable_store.clone(),
1074                        sst_read_options.clone(),
1075                        sstable_info,
1076                    ));
1077                    local_stats.overlapping_iter_count += 1;
1078                }
1079            }
1080        }
1081        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1082        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1083            let table_id_string = table_id.to_string();
1084            tracing::warn!(
1085                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1086                table_id_string,
1087                epoch,
1088                fetch_meta_duration_sec,
1089                local_stats.cache_meta_block_miss
1090            );
1091            self.state_store_metrics
1092                .iter_slow_fetch_meta_cache_unhits
1093                .set(local_stats.cache_meta_block_miss as i64);
1094        }
1095        Ok(())
1096    }
1097
1098    pub async fn iter_log(
1099        &self,
1100        version: PinnedVersion,
1101        epoch_range: (u64, u64),
1102        key_range: TableKeyRange,
1103        options: ReadLogOptions,
1104    ) -> HummockResult<ChangeLogIterator> {
1105        let change_log = {
1106            let table_change_logs = version.table_change_log_read_lock();
1107            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1108                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1109            } else {
1110                Vec::new()
1111            }
1112        };
1113
1114        if let Some(max_epoch_change_log) = change_log.last() {
1115            let (_, max_epoch) = epoch_range;
1116            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1117                warn!(
1118                    max_epoch,
1119                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1120                    table_id = %options.table_id,
1121                    "max_epoch does not exist"
1122                );
1123            }
1124        }
1125        let read_options = Arc::new(SstableIteratorReadOptions {
1126            cache_policy: Default::default(),
1127            must_iterated_end_user_key: None,
1128            max_preload_retry_times: 0,
1129            prefetch_for_large_query: false,
1130        });
1131
1132        async fn make_iter(
1133            sstable_infos: impl Iterator<Item = &SstableInfo>,
1134            sstable_store: &SstableStoreRef,
1135            read_options: Arc<SstableIteratorReadOptions>,
1136            local_stat: &mut StoreLocalStatistic,
1137        ) -> HummockResult<MergeIterator<SstableIterator>> {
1138            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1139                let sstable_store = sstable_store.clone();
1140                let read_options = read_options.clone();
1141                async move {
1142                    let mut local_stat = StoreLocalStatistic::default();
1143                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1144                    Ok::<_, HummockError>((
1145                        SstableIterator::new(
1146                            table_holder,
1147                            sstable_store,
1148                            read_options,
1149                            sstable_info,
1150                        ),
1151                        local_stat,
1152                    ))
1153                }
1154            }))
1155            .await?;
1156            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1157                |(iter, stats)| {
1158                    local_stat.add(&stats);
1159                    iter
1160                },
1161            )))
1162        }
1163
1164        let mut local_stat = StoreLocalStatistic::default();
1165
1166        let new_value_iter = make_iter(
1167            change_log
1168                .iter()
1169                .flat_map(|log| log.new_value.iter())
1170                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1171            &self.sstable_store,
1172            read_options.clone(),
1173            &mut local_stat,
1174        )
1175        .await?;
1176        let old_value_iter = make_iter(
1177            change_log
1178                .iter()
1179                .flat_map(|log| log.old_value.iter())
1180                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1181            &self.sstable_store,
1182            read_options.clone(),
1183            &mut local_stat,
1184        )
1185        .await?;
1186        ChangeLogIterator::new(
1187            epoch_range,
1188            key_range,
1189            new_value_iter,
1190            old_value_iter,
1191            options.table_id,
1192            IterLocalMetricsGuard::new(
1193                self.state_store_metrics.clone(),
1194                options.table_id,
1195                local_stat,
1196            ),
1197        )
1198        .await
1199    }
1200
1201    pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1202        &'a self,
1203        version: PinnedVersion,
1204        table_id: TableId,
1205        target: VectorRef<'a>,
1206        options: VectorNearestOptions,
1207        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1208    ) -> HummockResult<Vec<O>> {
1209        let Some(index) = version.vector_indexes.get(&table_id) else {
1210            return Ok(vec![]);
1211        };
1212        if target.dimension() != index.dimension {
1213            return Err(HummockError::other(format!(
1214                "target dimension {} not match index dimension {}",
1215                target.dimension(),
1216                index.dimension
1217            )));
1218        }
1219        match &index.inner {
1220            VectorIndexImpl::Flat(flat) => {
1221                let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1222                for vector_file in &flat.vector_store_info.vector_files {
1223                    let meta = self.sstable_store.get_vector_file_meta(vector_file).await?;
1224                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1225                        let block = self
1226                            .sstable_store
1227                            .get_vector_block(vector_file, i, block_meta)
1228                            .await?;
1229                        builder.add(&**block, &on_nearest_item_fn);
1230                    }
1231                }
1232                Ok(builder.finish())
1233            }
1234            VectorIndexImpl::HnswFlat(hnsw_flat) => {
1235                let Some(graph_file) = &hnsw_flat.graph_file else {
1236                    return Ok(vec![]);
1237                };
1238
1239                let graph = self.sstable_store.get_hnsw_graph(graph_file).await?;
1240
1241                let vector_store =
1242                    FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1243                let (items, _stats) = nearest::<O, M>(
1244                    &vector_store,
1245                    &*graph,
1246                    target,
1247                    on_nearest_item_fn,
1248                    options.hnsw_ef_search,
1249                    options.top_n,
1250                )
1251                .await?;
1252                Ok(items)
1253            }
1254        }
1255    }
1256}