risingwave_storage/hummock/store/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::TableId;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::util::epoch::MAX_SPILL_TIMES;
29use risingwave_hummock_sdk::key::{
30    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
31};
32use risingwave_hummock_sdk::key_range::KeyRangeCommon;
33use risingwave_hummock_sdk::sstable_info::SstableInfo;
34use risingwave_hummock_sdk::table_watermark::{
35    PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
36};
37use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
38use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
39use risingwave_pb::hummock::LevelType;
40use sync_point::sync_point;
41use tracing::warn;
42
43use crate::error::StorageResult;
44use crate::hummock::event_handler::LocalInstanceId;
45use crate::hummock::iterator::change_log::ChangeLogIterator;
46use crate::hummock::iterator::{
47    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
48};
49use crate::hummock::local_version::pinned_version::PinnedVersion;
50use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
51use crate::hummock::sstable_store::SstableStoreRef;
52use crate::hummock::utils::{
53    filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
54    search_sst_idx,
55};
56use crate::hummock::vector::file::FileVectorStore;
57use crate::hummock::{
58    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
59    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
60    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
61    hit_sstable_bloom_filter,
62};
63use crate::mem_table::{
64    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
65};
66use crate::monitor::{
67    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
68};
69use crate::store::{
70    OnNearestItemFn, ReadLogOptions, ReadOptions, Vector, VectorNearestOptions, gen_min_epoch,
71};
72use crate::vector::hnsw::nearest;
73use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
74
75pub type CommittedVersion = PinnedVersion;
76
77/// Data not committed to Hummock. There are two types of staging data:
78/// - Immutable memtable: data that has been written into local state store but not persisted.
79/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
80///   hummock version.
81
82#[derive(Clone, Debug, PartialEq)]
83pub struct StagingSstableInfo {
84    // newer data comes first
85    sstable_infos: Vec<LocalSstableInfo>,
86    old_value_sstable_infos: Vec<LocalSstableInfo>,
87    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
88    /// The field must not be empty.
89    epochs: Vec<HummockEpoch>,
90    // newer data at the front
91    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
92    imm_size: usize,
93}
94
95impl StagingSstableInfo {
96    pub fn new(
97        sstable_infos: Vec<LocalSstableInfo>,
98        old_value_sstable_infos: Vec<LocalSstableInfo>,
99        epochs: Vec<HummockEpoch>,
100        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
101        imm_size: usize,
102    ) -> Self {
103        // the epochs are sorted from higher epoch to lower epoch
104        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
105        Self {
106            sstable_infos,
107            old_value_sstable_infos,
108            epochs,
109            imm_ids,
110            imm_size,
111        }
112    }
113
114    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
115        &self.sstable_infos
116    }
117
118    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
119        &self.old_value_sstable_infos
120    }
121
122    pub fn imm_size(&self) -> usize {
123        self.imm_size
124    }
125
126    pub fn epochs(&self) -> &Vec<HummockEpoch> {
127        &self.epochs
128    }
129
130    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
131        &self.imm_ids
132    }
133}
134
135pub enum VersionUpdate {
136    Sst(Arc<StagingSstableInfo>),
137    CommittedSnapshot(CommittedVersion),
138    NewTableWatermark {
139        direction: WatermarkDirection,
140        epoch: HummockEpoch,
141        vnode_watermarks: Vec<VnodeWatermark>,
142        watermark_type: WatermarkSerdeType,
143    },
144}
145
146#[derive(Clone)]
147pub struct StagingVersion {
148    pending_imm_size: usize,
149    /// It contains the imms added but not sent to the uploader of hummock event handler.
150    /// It is non-empty only when `upload_on_flush` is false.
151    ///
152    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
153    ///
154    /// newer data comes last
155    pub pending_imms: Vec<ImmutableMemtable>,
156    /// It contains the imms already sent to uploader of hummock event handler.
157    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
158    /// imm of smaller batch id may be added later than one with greater batch id
159    ///
160    /// Newer data comes first.
161    pub uploading_imms: VecDeque<ImmutableMemtable>,
162
163    // newer data comes first
164    pub sst: VecDeque<Arc<StagingSstableInfo>>,
165}
166
167impl StagingVersion {
168    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
169    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
170    pub fn prune_overlap<'a>(
171        &'a self,
172        max_epoch_inclusive: HummockEpoch,
173        table_id: TableId,
174        table_key_range: &'a TableKeyRange,
175    ) -> (
176        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
177        impl Iterator<Item = &'a SstableInfo> + 'a,
178    ) {
179        let (left, right) = table_key_range;
180        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
181        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
182        let overlapped_imms = self
183            .pending_imms
184            .iter()
185            .rev() // rev to let newer imm come first
186            .chain(self.uploading_imms.iter())
187            .filter(move |imm| {
188                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
189                imm.min_epoch() <= max_epoch_inclusive
190                    && imm.table_id == table_id
191                    && range_overlap(
192                        &(left, right),
193                        &imm.start_table_key(),
194                        Bound::Included(&imm.end_table_key()),
195                    )
196            });
197
198        // TODO: Remove duplicate sst based on sst id
199        let overlapped_ssts = self
200            .sst
201            .iter()
202            .filter(move |staging_sst| {
203                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
204                sst_max_epoch <= max_epoch_inclusive
205            })
206            .flat_map(move |staging_sst| {
207                // TODO: sstable info should be concat-able after each streaming table owns a read
208                // version. May use concat sstable iter instead in some cases.
209                staging_sst
210                    .sstable_infos
211                    .iter()
212                    .map(|sstable| &sstable.sst_info)
213                    .filter(move |sstable: &&SstableInfo| {
214                        filter_single_sst(sstable, table_id, table_key_range)
215                    })
216            });
217        (overlapped_imms, overlapped_ssts)
218    }
219
220    pub fn is_empty(&self) -> bool {
221        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
222    }
223}
224
225#[derive(Clone)]
226/// A container of information required for reading from hummock.
227pub struct HummockReadVersion {
228    table_id: TableId,
229    instance_id: LocalInstanceId,
230
231    /// Local version for staging data.
232    staging: StagingVersion,
233
234    /// Remote version for committed data.
235    committed: CommittedVersion,
236
237    /// Indicate if this is replicated. If it is, we should ignore it during
238    /// global state store read, to avoid duplicated results.
239    /// Otherwise for local state store, it is fine, see we will see the
240    /// `ReadVersion` just for that local state store.
241    is_replicated: bool,
242
243    table_watermarks: Option<PkPrefixTableWatermarksIndex>,
244
245    // Vnode bitmap corresponding to the read version
246    // It will be initialized after local state store init
247    vnodes: Arc<Bitmap>,
248}
249
250impl HummockReadVersion {
251    pub fn new_with_replication_option(
252        table_id: TableId,
253        instance_id: LocalInstanceId,
254        committed_version: CommittedVersion,
255        is_replicated: bool,
256        vnodes: Arc<Bitmap>,
257    ) -> Self {
258        // before build `HummockReadVersion`, we need to get the a initial version which obtained
259        // from meta. want this initialization after version is initialized (now with
260        // notification), so add a assert condition to guarantee correct initialization order
261        assert!(committed_version.is_valid());
262        Self {
263            table_id,
264            instance_id,
265            table_watermarks: {
266                match committed_version.table_watermarks.get(&table_id) {
267                    Some(table_watermarks) => match table_watermarks.watermark_type {
268                        WatermarkSerdeType::PkPrefix => {
269                            Some(PkPrefixTableWatermarksIndex::new_committed(
270                                table_watermarks.clone(),
271                                committed_version
272                                    .state_table_info
273                                    .info()
274                                    .get(&table_id)
275                                    .expect("should exist")
276                                    .committed_epoch,
277                            ))
278                        }
279
280                        WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */
281                    },
282                    None => None,
283                }
284            },
285            staging: StagingVersion {
286                pending_imm_size: 0,
287                pending_imms: Vec::default(),
288                uploading_imms: VecDeque::default(),
289                sst: VecDeque::default(),
290            },
291
292            committed: committed_version,
293
294            is_replicated,
295            vnodes,
296        }
297    }
298
299    pub fn new(
300        table_id: TableId,
301        instance_id: LocalInstanceId,
302        committed_version: CommittedVersion,
303        vnodes: Arc<Bitmap>,
304    ) -> Self {
305        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
306    }
307
308    pub fn table_id(&self) -> TableId {
309        self.table_id
310    }
311
312    pub fn add_imm(&mut self, imm: ImmutableMemtable) {
313        if let Some(item) = self
314            .staging
315            .pending_imms
316            .last()
317            .or_else(|| self.staging.uploading_imms.front())
318        {
319            // check batch_id order from newest to old
320            debug_assert!(item.batch_id() < imm.batch_id());
321        }
322
323        self.staging.pending_imm_size += imm.size();
324        self.staging.pending_imms.push(imm);
325    }
326
327    pub fn pending_imm_size(&self) -> usize {
328        self.staging.pending_imm_size
329    }
330
331    pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
332        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
333        for imm in &pending_imms {
334            self.staging.uploading_imms.push_front(imm.clone());
335        }
336        self.staging.pending_imm_size = 0;
337        pending_imms
338    }
339
340    /// Updates the read version with `VersionUpdate`.
341    /// There will be three data types to be processed
342    /// `VersionUpdate::Staging`
343    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
344    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
345    ///       corresponding `staging_imms` according to the `batch_id`
346    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
347    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
348    /// `staging_sst` and `staging_imm` in memory according to epoch
349    pub fn update(&mut self, info: VersionUpdate) {
350        match info {
351            VersionUpdate::Sst(staging_sst_ref) => {
352                {
353                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
354                        warn!(
355                            instance_id = self.instance_id,
356                            "no related imm in sst input"
357                        );
358                        return;
359                    };
360
361                    // old data comes first
362                    for imm_id in imms.iter().rev() {
363                        let check_err = match self.staging.uploading_imms.pop_back() {
364                            None => Some("empty".to_owned()),
365                            Some(prev_imm_id) => {
366                                if prev_imm_id.batch_id() == *imm_id {
367                                    None
368                                } else {
369                                    Some(format!(
370                                        "miss match id {} {}",
371                                        prev_imm_id.batch_id(),
372                                        *imm_id
373                                    ))
374                                }
375                            }
376                        };
377                        assert!(
378                            check_err.is_none(),
379                            "should be valid staging_sst.size {},
380                                    staging_sst.imm_ids {:?},
381                                    staging_sst.epochs {:?},
382                                    local_pending_imm_ids {:?},
383                                    local_uploading_imm_ids {:?},
384                                    instance_id {}
385                                    check_err {:?}",
386                            staging_sst_ref.imm_size,
387                            staging_sst_ref.imm_ids,
388                            staging_sst_ref.epochs,
389                            self.staging
390                                .pending_imms
391                                .iter()
392                                .map(|imm| imm.batch_id())
393                                .collect_vec(),
394                            self.staging
395                                .uploading_imms
396                                .iter()
397                                .map(|imm| imm.batch_id())
398                                .collect_vec(),
399                            self.instance_id,
400                            check_err
401                        );
402                    }
403
404                    self.staging.sst.push_front(staging_sst_ref);
405                }
406            }
407
408            VersionUpdate::CommittedSnapshot(committed_version) => {
409                if let Some(info) = committed_version
410                    .state_table_info
411                    .info()
412                    .get(&self.table_id)
413                {
414                    let committed_epoch = info.committed_epoch;
415                    if self.is_replicated {
416                        self.staging
417                            .uploading_imms
418                            .retain(|imm| imm.min_epoch() > committed_epoch);
419                        self.staging
420                            .pending_imms
421                            .retain(|imm| imm.min_epoch() > committed_epoch);
422                    } else {
423                        self.staging
424                            .pending_imms
425                            .iter()
426                            .chain(self.staging.uploading_imms.iter())
427                            .for_each(|imm| {
428                                assert!(
429                                    imm.min_epoch() > committed_epoch,
430                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
431                                    imm.table_id,
432                                    imm.min_epoch(),
433                                    committed_epoch
434                                )
435                            });
436                    }
437
438                    self.staging.sst.retain(|sst| {
439                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
440                    });
441
442                    // check epochs.last() > MCE
443                    assert!(self.staging.sst.iter().all(|sst| {
444                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
445                    }));
446
447                    if let Some(committed_watermarks) =
448                        committed_version.table_watermarks.get(&self.table_id)
449                        && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
450                    {
451                        if let Some(watermark_index) = &mut self.table_watermarks {
452                            watermark_index.apply_committed_watermarks(
453                                committed_watermarks.clone(),
454                                committed_epoch,
455                            );
456                        } else {
457                            self.table_watermarks =
458                                Some(PkPrefixTableWatermarksIndex::new_committed(
459                                    committed_watermarks.clone(),
460                                    committed_epoch,
461                                ));
462                        }
463                    }
464                }
465
466                self.committed = committed_version;
467            }
468            VersionUpdate::NewTableWatermark {
469                direction,
470                epoch,
471                vnode_watermarks,
472                watermark_type,
473            } => {
474                assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
475                if let Some(watermark_index) = &mut self.table_watermarks {
476                    watermark_index.add_epoch_watermark(
477                        epoch,
478                        Arc::from(vnode_watermarks),
479                        direction,
480                    );
481                } else {
482                    self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
483                        direction,
484                        epoch,
485                        vnode_watermarks,
486                        self.committed.table_committed_epoch(self.table_id),
487                    ));
488                }
489            }
490        }
491    }
492
493    pub fn staging(&self) -> &StagingVersion {
494        &self.staging
495    }
496
497    pub fn committed(&self) -> &CommittedVersion {
498        &self.committed
499    }
500
501    /// We have assumption that the watermark is increasing monotonically. Therefore,
502    /// here if the upper layer usage has passed an regressed watermark, we should
503    /// filter out the regressed watermark. Currently the kv log store may write
504    /// regressed watermark
505    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
506        if let Some(watermark_index) = &self.table_watermarks {
507            watermark_index.filter_regress_watermarks(watermarks)
508        }
509    }
510
511    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
512        self.table_watermarks
513            .as_ref()
514            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
515    }
516
517    pub fn is_replicated(&self) -> bool {
518        self.is_replicated
519    }
520
521    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
522        std::mem::replace(&mut self.vnodes, vnodes)
523    }
524
525    pub fn contains(&self, vnode: VirtualNode) -> bool {
526        self.vnodes.is_set(vnode.to_index())
527    }
528
529    pub fn vnodes(&self) -> Arc<Bitmap> {
530        self.vnodes.clone()
531    }
532}
533
534pub fn read_filter_for_version(
535    epoch: HummockEpoch,
536    table_id: TableId,
537    mut table_key_range: TableKeyRange,
538    read_version: &RwLock<HummockReadVersion>,
539) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
540    let read_version_guard = read_version.read();
541
542    let committed_version = read_version_guard.committed().clone();
543
544    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
545        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
546    }
547
548    let (imm_iter, sst_iter) =
549        read_version_guard
550            .staging()
551            .prune_overlap(epoch, table_id, &table_key_range);
552
553    let imms = imm_iter.cloned().collect();
554    let ssts = sst_iter.cloned().collect();
555
556    Ok((table_key_range, (imms, ssts, committed_version)))
557}
558
559#[derive(Clone)]
560pub struct HummockVersionReader {
561    sstable_store: SstableStoreRef,
562
563    /// Statistics
564    state_store_metrics: Arc<HummockStateStoreMetrics>,
565    preload_retry_times: usize,
566}
567
568/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
569/// `streaming_query`
570impl HummockVersionReader {
571    pub fn new(
572        sstable_store: SstableStoreRef,
573        state_store_metrics: Arc<HummockStateStoreMetrics>,
574        preload_retry_times: usize,
575    ) -> Self {
576        Self {
577            sstable_store,
578            state_store_metrics,
579            preload_retry_times,
580        }
581    }
582
583    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
584        &self.state_store_metrics
585    }
586}
587
588const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
589
590impl HummockVersionReader {
591    pub async fn get<O>(
592        &self,
593        table_key: TableKey<Bytes>,
594        epoch: u64,
595        table_id: TableId,
596        read_options: ReadOptions,
597        read_version_tuple: ReadVersionTuple,
598        on_key_value_fn: impl crate::store::KeyValueFn<O>,
599    ) -> StorageResult<Option<O>> {
600        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
601
602        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
603        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
604        let local_stats = &mut stats_guard.local_stats;
605        local_stats.found_key = true;
606
607        // 1. read staging data
608        for imm in &imms {
609            // skip imm that only holding out-of-date data
610            if imm.max_epoch() < min_epoch {
611                continue;
612            }
613
614            local_stats.staging_imm_get_count += 1;
615
616            if let Some((data, data_epoch)) = get_from_batch(
617                imm,
618                TableKey(table_key.as_ref()),
619                epoch,
620                &read_options,
621                local_stats,
622            ) {
623                return Ok(if data_epoch.pure_epoch() < min_epoch {
624                    None
625                } else {
626                    data.into_user_value()
627                        .map(|v| {
628                            on_key_value_fn(
629                                FullKey::new_with_gap_epoch(
630                                    table_id,
631                                    table_key.to_ref(),
632                                    data_epoch,
633                                ),
634                                v.as_ref(),
635                            )
636                        })
637                        .transpose()?
638                });
639            }
640        }
641
642        // 2. order guarantee: imm -> sst
643        let dist_key_hash = read_options
644            .prefix_hint
645            .as_ref()
646            .map(|dist_key| Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.table_id()));
647
648        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
649        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
650        let full_key = FullKey::new_with_gap_epoch(
651            table_id,
652            TableKey(table_key.clone()),
653            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
654        );
655        for local_sst in &uncommitted_ssts {
656            local_stats.staging_sst_get_count += 1;
657            if let Some(iter) = get_from_sstable_info(
658                self.sstable_store.clone(),
659                local_sst,
660                full_key.to_ref(),
661                &read_options,
662                dist_key_hash,
663                local_stats,
664            )
665            .await?
666            {
667                debug_assert!(iter.is_valid());
668                let data_epoch = iter.key().epoch_with_gap;
669                return Ok(if data_epoch.pure_epoch() < min_epoch {
670                    None
671                } else {
672                    iter.value()
673                        .into_user_value()
674                        .map(|v| {
675                            on_key_value_fn(
676                                FullKey::new_with_gap_epoch(
677                                    table_id,
678                                    table_key.to_ref(),
679                                    data_epoch,
680                                ),
681                                v,
682                            )
683                        })
684                        .transpose()?
685                });
686            }
687        }
688        let single_table_key_range = table_key.clone()..=table_key.clone();
689        // 3. read from committed_version sst file
690        // Because SST meta records encoded key range,
691        // the filter key needs to be encoded as well.
692        assert!(committed_version.is_valid());
693        for level in committed_version.levels(table_id) {
694            if level.table_infos.is_empty() {
695                continue;
696            }
697
698            match level.level_type {
699                LevelType::Overlapping | LevelType::Unspecified => {
700                    let sstable_infos = prune_overlapping_ssts(
701                        &level.table_infos,
702                        table_id,
703                        &single_table_key_range,
704                    );
705                    for sstable_info in sstable_infos {
706                        local_stats.overlapping_get_count += 1;
707                        if let Some(iter) = get_from_sstable_info(
708                            self.sstable_store.clone(),
709                            sstable_info,
710                            full_key.to_ref(),
711                            &read_options,
712                            dist_key_hash,
713                            local_stats,
714                        )
715                        .await?
716                        {
717                            debug_assert!(iter.is_valid());
718                            let data_epoch = iter.key().epoch_with_gap;
719                            return Ok(if data_epoch.pure_epoch() < min_epoch {
720                                None
721                            } else {
722                                iter.value()
723                                    .into_user_value()
724                                    .map(|v| {
725                                        on_key_value_fn(
726                                            FullKey::new_with_gap_epoch(
727                                                table_id,
728                                                table_key.to_ref(),
729                                                data_epoch,
730                                            ),
731                                            v,
732                                        )
733                                    })
734                                    .transpose()?
735                            });
736                        }
737                    }
738                }
739                LevelType::Nonoverlapping => {
740                    let mut table_info_idx =
741                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
742                    if table_info_idx == 0 {
743                        continue;
744                    }
745                    table_info_idx = table_info_idx.saturating_sub(1);
746                    let ord = level.table_infos[table_info_idx]
747                        .key_range
748                        .compare_right_with_user_key(full_key.user_key.as_ref());
749                    // the case that the key falls into the gap between two ssts
750                    if ord == Ordering::Less {
751                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
752                        continue;
753                    }
754
755                    local_stats.non_overlapping_get_count += 1;
756                    if let Some(iter) = get_from_sstable_info(
757                        self.sstable_store.clone(),
758                        &level.table_infos[table_info_idx],
759                        full_key.to_ref(),
760                        &read_options,
761                        dist_key_hash,
762                        local_stats,
763                    )
764                    .await?
765                    {
766                        debug_assert!(iter.is_valid());
767                        let data_epoch = iter.key().epoch_with_gap;
768                        return Ok(if data_epoch.pure_epoch() < min_epoch {
769                            None
770                        } else {
771                            iter.value()
772                                .into_user_value()
773                                .map(|v| {
774                                    on_key_value_fn(
775                                        FullKey::new_with_gap_epoch(
776                                            table_id,
777                                            table_key.to_ref(),
778                                            data_epoch,
779                                        ),
780                                        v,
781                                    )
782                                })
783                                .transpose()?
784                        });
785                    }
786                }
787            }
788        }
789        stats_guard.local_stats.found_key = false;
790        Ok(None)
791    }
792
793    pub async fn iter(
794        &self,
795        table_key_range: TableKeyRange,
796        epoch: u64,
797        table_id: TableId,
798        read_options: ReadOptions,
799        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
800    ) -> StorageResult<HummockStorageIterator> {
801        self.iter_with_memtable(
802            table_key_range,
803            epoch,
804            table_id,
805            read_options,
806            read_version_tuple,
807            None,
808        )
809        .await
810    }
811
812    pub async fn iter_with_memtable<'b>(
813        &self,
814        table_key_range: TableKeyRange,
815        epoch: u64,
816        table_id: TableId,
817        read_options: ReadOptions,
818        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
819        memtable_iter: Option<MemTableHummockIterator<'b>>,
820    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
821        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
822        let user_key_range = (
823            user_key_range_ref.0.map(|key| key.cloned()),
824            user_key_range_ref.1.map(|key| key.cloned()),
825        );
826        let mut factory = ForwardIteratorFactory::default();
827        let mut local_stats = StoreLocalStatistic::default();
828        let (imms, uncommitted_ssts, committed) = read_version_tuple;
829        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
830        self.iter_inner(
831            table_key_range,
832            epoch,
833            table_id,
834            read_options,
835            imms,
836            uncommitted_ssts,
837            &committed,
838            &mut local_stats,
839            &mut factory,
840        )
841        .await?;
842        let merge_iter = factory.build(memtable_iter);
843        // the epoch_range left bound for iterator read
844        let mut user_iter = UserIterator::new(
845            merge_iter,
846            user_key_range,
847            epoch,
848            min_epoch,
849            Some(committed),
850        );
851        user_iter.rewind().await?;
852        Ok(HummockStorageIteratorInner::new(
853            user_iter,
854            self.state_store_metrics.clone(),
855            table_id,
856            local_stats,
857        ))
858    }
859
860    pub async fn rev_iter<'b>(
861        &self,
862        table_key_range: TableKeyRange,
863        epoch: u64,
864        table_id: TableId,
865        read_options: ReadOptions,
866        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
867        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
868    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
869        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
870        let user_key_range = (
871            user_key_range_ref.0.map(|key| key.cloned()),
872            user_key_range_ref.1.map(|key| key.cloned()),
873        );
874        let mut factory = BackwardIteratorFactory::default();
875        let mut local_stats = StoreLocalStatistic::default();
876        let (imms, uncommitted_ssts, committed) = read_version_tuple;
877        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
878        self.iter_inner(
879            table_key_range,
880            epoch,
881            table_id,
882            read_options,
883            imms,
884            uncommitted_ssts,
885            &committed,
886            &mut local_stats,
887            &mut factory,
888        )
889        .await?;
890        let merge_iter = factory.build(memtable_iter);
891        // the epoch_range left bound for iterator read
892        let mut user_iter = BackwardUserIterator::new(
893            merge_iter,
894            user_key_range,
895            epoch,
896            min_epoch,
897            Some(committed),
898        );
899        user_iter.rewind().await?;
900        Ok(HummockStorageRevIteratorInner::new(
901            user_iter,
902            self.state_store_metrics.clone(),
903            table_id,
904            local_stats,
905        ))
906    }
907
908    async fn iter_inner<F: IteratorFactory>(
909        &self,
910        table_key_range: TableKeyRange,
911        epoch: u64,
912        table_id: TableId,
913        read_options: ReadOptions,
914        imms: Vec<ImmutableMemtable>,
915        uncommitted_ssts: Vec<SstableInfo>,
916        committed: &CommittedVersion,
917        local_stats: &mut StoreLocalStatistic,
918        factory: &mut F,
919    ) -> StorageResult<()> {
920        {
921            fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
922                match bound {
923                    Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
924                    Bound::Unbounded => None,
925                }
926            }
927            let (left, right) = &table_key_range;
928            if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
929                && right < left
930            {
931                if cfg!(debug_assertions) {
932                    panic!("invalid iter key range: {table_id} {left:?} {right:?}")
933                } else {
934                    return Err(HummockError::other(format!(
935                        "invalid iter key range: {table_id} {left:?} {right:?}"
936                    ))
937                    .into());
938                }
939            }
940        }
941
942        local_stats.staging_imm_iter_count = imms.len() as u64;
943        for imm in imms {
944            factory.add_batch_iter(imm);
945        }
946
947        // 2. build iterator from committed
948        // Because SST meta records encoded key range,
949        // the filter key range needs to be encoded as well.
950        let user_key_range = bound_table_key_range(table_id, &table_key_range);
951        let user_key_range_ref = (
952            user_key_range.0.as_ref().map(UserKey::as_ref),
953            user_key_range.1.as_ref().map(UserKey::as_ref),
954        );
955        let mut staging_sst_iter_count = 0;
956        // encode once
957        let bloom_filter_prefix_hash = read_options
958            .prefix_hint
959            .as_ref()
960            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.table_id()));
961        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
962        if read_options.prefetch_options.prefetch {
963            sst_read_options.must_iterated_end_user_key =
964                Some(user_key_range.1.map(|key| key.cloned()));
965            sst_read_options.max_preload_retry_times = self.preload_retry_times;
966        }
967        let sst_read_options = Arc::new(sst_read_options);
968        for sstable_info in &uncommitted_ssts {
969            let table_holder = self
970                .sstable_store
971                .sstable(sstable_info, local_stats)
972                .await?;
973
974            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
975                && !hit_sstable_bloom_filter(
976                    &table_holder,
977                    &user_key_range_ref,
978                    *prefix_hash,
979                    local_stats,
980                )
981            {
982                continue;
983            }
984
985            staging_sst_iter_count += 1;
986            factory.add_staging_sst_iter(F::SstableIteratorType::create(
987                table_holder,
988                self.sstable_store.clone(),
989                sst_read_options.clone(),
990                sstable_info,
991            ));
992        }
993        local_stats.staging_sst_iter_count = staging_sst_iter_count;
994
995        let timer = Instant::now();
996
997        for level in committed.levels(table_id) {
998            if level.table_infos.is_empty() {
999                continue;
1000            }
1001
1002            if level.level_type == LevelType::Nonoverlapping {
1003                let mut table_infos = prune_nonoverlapping_ssts(
1004                    &level.table_infos,
1005                    user_key_range_ref,
1006                    table_id.table_id(),
1007                )
1008                .peekable();
1009
1010                if table_infos.peek().is_none() {
1011                    continue;
1012                }
1013                let sstable_infos = table_infos.cloned().collect_vec();
1014                if sstable_infos.len() > 1 {
1015                    factory.add_concat_sst_iter(
1016                        sstable_infos,
1017                        self.sstable_store.clone(),
1018                        sst_read_options.clone(),
1019                    );
1020                    local_stats.non_overlapping_iter_count += 1;
1021                } else {
1022                    let sstable = self
1023                        .sstable_store
1024                        .sstable(&sstable_infos[0], local_stats)
1025                        .await?;
1026
1027                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1028                        && !hit_sstable_bloom_filter(
1029                            &sstable,
1030                            &user_key_range_ref,
1031                            *dist_hash,
1032                            local_stats,
1033                        )
1034                    {
1035                        continue;
1036                    }
1037                    // Since there is only one sst to be included for the current non-overlapping
1038                    // level, there is no need to create a ConcatIterator on it.
1039                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1040                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1041                    // it in `non_overlapping_iter_count`.
1042                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1043                        sstable,
1044                        self.sstable_store.clone(),
1045                        sst_read_options.clone(),
1046                        &sstable_infos[0],
1047                    ));
1048                    local_stats.non_overlapping_iter_count += 1;
1049                }
1050            } else {
1051                let table_infos =
1052                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1053                // Overlapping
1054                let fetch_meta_req = table_infos.rev().collect_vec();
1055                if fetch_meta_req.is_empty() {
1056                    continue;
1057                }
1058                for sstable_info in fetch_meta_req {
1059                    let sstable = self
1060                        .sstable_store
1061                        .sstable(sstable_info, local_stats)
1062                        .await?;
1063                    assert_eq!(sstable_info.object_id, sstable.id);
1064                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1065                        && !hit_sstable_bloom_filter(
1066                            &sstable,
1067                            &user_key_range_ref,
1068                            *dist_hash,
1069                            local_stats,
1070                        )
1071                    {
1072                        continue;
1073                    }
1074                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1075                        sstable,
1076                        self.sstable_store.clone(),
1077                        sst_read_options.clone(),
1078                        sstable_info,
1079                    ));
1080                    local_stats.overlapping_iter_count += 1;
1081                }
1082            }
1083        }
1084        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1085        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1086            let table_id_string = table_id.to_string();
1087            tracing::warn!(
1088                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1089                table_id_string,
1090                epoch,
1091                fetch_meta_duration_sec,
1092                local_stats.cache_meta_block_miss
1093            );
1094            self.state_store_metrics
1095                .iter_slow_fetch_meta_cache_unhits
1096                .set(local_stats.cache_meta_block_miss as i64);
1097        }
1098        Ok(())
1099    }
1100
1101    pub async fn iter_log(
1102        &self,
1103        version: PinnedVersion,
1104        epoch_range: (u64, u64),
1105        key_range: TableKeyRange,
1106        options: ReadLogOptions,
1107    ) -> HummockResult<ChangeLogIterator> {
1108        let change_log = {
1109            let table_change_logs = version.table_change_log_read_lock();
1110            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1111                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1112            } else {
1113                Vec::new()
1114            }
1115        };
1116
1117        if let Some(max_epoch_change_log) = change_log.last() {
1118            let (_, max_epoch) = epoch_range;
1119            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1120                warn!(
1121                    max_epoch,
1122                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1123                    table_id = options.table_id.table_id,
1124                    "max_epoch does not exist"
1125                );
1126            }
1127        }
1128        let read_options = Arc::new(SstableIteratorReadOptions {
1129            cache_policy: Default::default(),
1130            must_iterated_end_user_key: None,
1131            max_preload_retry_times: 0,
1132            prefetch_for_large_query: false,
1133        });
1134
1135        async fn make_iter(
1136            sstable_infos: impl Iterator<Item = &SstableInfo>,
1137            sstable_store: &SstableStoreRef,
1138            read_options: Arc<SstableIteratorReadOptions>,
1139            local_stat: &mut StoreLocalStatistic,
1140        ) -> HummockResult<MergeIterator<SstableIterator>> {
1141            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1142                let sstable_store = sstable_store.clone();
1143                let read_options = read_options.clone();
1144                async move {
1145                    let mut local_stat = StoreLocalStatistic::default();
1146                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1147                    Ok::<_, HummockError>((
1148                        SstableIterator::new(
1149                            table_holder,
1150                            sstable_store,
1151                            read_options,
1152                            sstable_info,
1153                        ),
1154                        local_stat,
1155                    ))
1156                }
1157            }))
1158            .await?;
1159            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1160                |(iter, stats)| {
1161                    local_stat.add(&stats);
1162                    iter
1163                },
1164            )))
1165        }
1166
1167        let mut local_stat = StoreLocalStatistic::default();
1168
1169        let new_value_iter = make_iter(
1170            change_log
1171                .iter()
1172                .flat_map(|log| log.new_value.iter())
1173                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1174            &self.sstable_store,
1175            read_options.clone(),
1176            &mut local_stat,
1177        )
1178        .await?;
1179        let old_value_iter = make_iter(
1180            change_log
1181                .iter()
1182                .flat_map(|log| log.old_value.iter())
1183                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1184            &self.sstable_store,
1185            read_options.clone(),
1186            &mut local_stat,
1187        )
1188        .await?;
1189        ChangeLogIterator::new(
1190            epoch_range,
1191            key_range,
1192            new_value_iter,
1193            old_value_iter,
1194            options.table_id,
1195            IterLocalMetricsGuard::new(
1196                self.state_store_metrics.clone(),
1197                options.table_id,
1198                local_stat,
1199            ),
1200        )
1201        .await
1202    }
1203
1204    pub async fn nearest<M: MeasureDistanceBuilder, O: Send>(
1205        &self,
1206        version: PinnedVersion,
1207        table_id: TableId,
1208        target: Vector,
1209        options: VectorNearestOptions,
1210        on_nearest_item_fn: impl OnNearestItemFn<O>,
1211    ) -> HummockResult<Vec<O>> {
1212        let Some(index) = version.vector_indexes.get(&table_id) else {
1213            return Ok(vec![]);
1214        };
1215        if target.dimension() != index.dimension {
1216            return Err(HummockError::other(format!(
1217                "target dimension {} not match index dimension {}",
1218                target.dimension(),
1219                index.dimension
1220            )));
1221        }
1222        match &index.inner {
1223            VectorIndexImpl::Flat(flat) => {
1224                let mut builder = NearestBuilder::<'_, O, M>::new(target.to_ref(), options.top_n);
1225                for vector_file in &flat.vector_store_info.vector_files {
1226                    let meta = self.sstable_store.get_vector_file_meta(vector_file).await?;
1227                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1228                        let block = self
1229                            .sstable_store
1230                            .get_vector_block(vector_file, i, block_meta)
1231                            .await?;
1232                        builder.add(&**block, &on_nearest_item_fn);
1233                    }
1234                }
1235                Ok(builder.finish())
1236            }
1237            VectorIndexImpl::HnswFlat(hnsw_flat) => {
1238                let Some(graph_file) = &hnsw_flat.graph_file else {
1239                    return Ok(vec![]);
1240                };
1241
1242                let graph = self.sstable_store.get_hnsw_graph(graph_file).await?;
1243
1244                let vector_store =
1245                    FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1246                let (items, _stats) = nearest::<O, M>(
1247                    &vector_store,
1248                    &*graph,
1249                    target.to_ref(),
1250                    on_nearest_item_fn,
1251                    options.hnsw_ef_search,
1252                    options.top_n,
1253                )
1254                .await?;
1255                Ok(items)
1256            }
1257        }
1258    }
1259}