risingwave_storage/hummock/store/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36    PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54    filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55    search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63    hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72    OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79/// Data not committed to Hummock. There are two types of staging data:
80/// - Immutable memtable: data that has been written into local state store but not persisted.
81/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
82///   hummock version.
83
84#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86    // newer data comes first
87    sstable_infos: Vec<LocalSstableInfo>,
88    old_value_sstable_infos: Vec<LocalSstableInfo>,
89    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
90    /// The field must not be empty.
91    epochs: Vec<HummockEpoch>,
92    // newer data at the front
93    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94    imm_size: usize,
95}
96
97impl StagingSstableInfo {
98    pub fn new(
99        sstable_infos: Vec<LocalSstableInfo>,
100        old_value_sstable_infos: Vec<LocalSstableInfo>,
101        epochs: Vec<HummockEpoch>,
102        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103        imm_size: usize,
104    ) -> Self {
105        // the epochs are sorted from higher epoch to lower epoch
106        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107        Self {
108            sstable_infos,
109            old_value_sstable_infos,
110            epochs,
111            imm_ids,
112            imm_size,
113        }
114    }
115
116    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117        &self.sstable_infos
118    }
119
120    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121        &self.old_value_sstable_infos
122    }
123
124    pub fn imm_size(&self) -> usize {
125        self.imm_size
126    }
127
128    pub fn epochs(&self) -> &Vec<HummockEpoch> {
129        &self.epochs
130    }
131
132    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133        &self.imm_ids
134    }
135}
136
137pub enum VersionUpdate {
138    Sst(Arc<StagingSstableInfo>),
139    CommittedSnapshot(CommittedVersion),
140    NewTableWatermark {
141        direction: WatermarkDirection,
142        epoch: HummockEpoch,
143        vnode_watermarks: Vec<VnodeWatermark>,
144        watermark_type: WatermarkSerdeType,
145    },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150    pending_imm_size: usize,
151    /// It contains the imms added but not sent to the uploader of hummock event handler.
152    /// It is non-empty only when `upload_on_flush` is false.
153    ///
154    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
155    ///
156    /// newer data comes last
157    pub pending_imms: Vec<ImmutableMemtable>,
158    /// It contains the imms already sent to uploader of hummock event handler.
159    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
160    /// imm of smaller batch id may be added later than one with greater batch id
161    ///
162    /// Newer data comes first.
163    pub uploading_imms: VecDeque<ImmutableMemtable>,
164
165    // newer data comes first
166    pub sst: VecDeque<Arc<StagingSstableInfo>>,
167}
168
169impl StagingVersion {
170    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
171    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
172    pub fn prune_overlap<'a>(
173        &'a self,
174        max_epoch_inclusive: HummockEpoch,
175        table_id: TableId,
176        table_key_range: &'a TableKeyRange,
177    ) -> (
178        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
179        impl Iterator<Item = &'a SstableInfo> + 'a,
180    ) {
181        let (left, right) = table_key_range;
182        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
183        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
184        let overlapped_imms = self
185            .pending_imms
186            .iter()
187            .rev() // rev to let newer imm come first
188            .chain(self.uploading_imms.iter())
189            .filter(move |imm| {
190                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
191                imm.min_epoch() <= max_epoch_inclusive
192                    && imm.table_id == table_id
193                    && range_overlap(
194                        &(left, right),
195                        &imm.start_table_key(),
196                        Bound::Included(&imm.end_table_key()),
197                    )
198            });
199
200        // TODO: Remove duplicate sst based on sst id
201        let overlapped_ssts = self
202            .sst
203            .iter()
204            .filter(move |staging_sst| {
205                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206                sst_max_epoch <= max_epoch_inclusive
207            })
208            .flat_map(move |staging_sst| {
209                // TODO: sstable info should be concat-able after each streaming table owns a read
210                // version. May use concat sstable iter instead in some cases.
211                staging_sst
212                    .sstable_infos
213                    .iter()
214                    .map(|sstable| &sstable.sst_info)
215                    .filter(move |sstable: &&SstableInfo| {
216                        filter_single_sst(sstable, table_id, table_key_range)
217                    })
218            });
219        (overlapped_imms, overlapped_ssts)
220    }
221
222    pub fn is_empty(&self) -> bool {
223        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224    }
225}
226
227#[derive(Clone)]
228/// A container of information required for reading from hummock.
229pub struct HummockReadVersion {
230    table_id: TableId,
231    instance_id: LocalInstanceId,
232
233    /// Local version for staging data.
234    staging: StagingVersion,
235
236    /// Remote version for committed data.
237    committed: CommittedVersion,
238
239    /// Indicate if this is replicated. If it is, we should ignore it during
240    /// global state store read, to avoid duplicated results.
241    /// Otherwise for local state store, it is fine, see we will see the
242    /// `ReadVersion` just for that local state store.
243    is_replicated: bool,
244
245    table_watermarks: Option<PkPrefixTableWatermarksIndex>,
246
247    // Vnode bitmap corresponding to the read version
248    // It will be initialized after local state store init
249    vnodes: Arc<Bitmap>,
250}
251
252impl HummockReadVersion {
253    pub fn new_with_replication_option(
254        table_id: TableId,
255        instance_id: LocalInstanceId,
256        committed_version: CommittedVersion,
257        is_replicated: bool,
258        vnodes: Arc<Bitmap>,
259    ) -> Self {
260        // before build `HummockReadVersion`, we need to get the a initial version which obtained
261        // from meta. want this initialization after version is initialized (now with
262        // notification), so add a assert condition to guarantee correct initialization order
263        assert!(committed_version.is_valid());
264        Self {
265            table_id,
266            instance_id,
267            table_watermarks: {
268                match committed_version.table_watermarks.get(&table_id) {
269                    Some(table_watermarks) => match table_watermarks.watermark_type {
270                        WatermarkSerdeType::PkPrefix => {
271                            Some(PkPrefixTableWatermarksIndex::new_committed(
272                                table_watermarks.clone(),
273                                committed_version
274                                    .state_table_info
275                                    .info()
276                                    .get(&table_id)
277                                    .expect("should exist")
278                                    .committed_epoch,
279                            ))
280                        }
281
282                        WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */
283                    },
284                    None => None,
285                }
286            },
287            staging: StagingVersion {
288                pending_imm_size: 0,
289                pending_imms: Vec::default(),
290                uploading_imms: VecDeque::default(),
291                sst: VecDeque::default(),
292            },
293
294            committed: committed_version,
295
296            is_replicated,
297            vnodes,
298        }
299    }
300
301    pub fn new(
302        table_id: TableId,
303        instance_id: LocalInstanceId,
304        committed_version: CommittedVersion,
305        vnodes: Arc<Bitmap>,
306    ) -> Self {
307        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
308    }
309
310    pub fn table_id(&self) -> TableId {
311        self.table_id
312    }
313
314    pub fn add_imm(&mut self, imm: ImmutableMemtable) {
315        if let Some(item) = self
316            .staging
317            .pending_imms
318            .last()
319            .or_else(|| self.staging.uploading_imms.front())
320        {
321            // check batch_id order from newest to old
322            debug_assert!(item.batch_id() < imm.batch_id());
323        }
324
325        self.staging.pending_imm_size += imm.size();
326        self.staging.pending_imms.push(imm);
327    }
328
329    pub fn pending_imm_size(&self) -> usize {
330        self.staging.pending_imm_size
331    }
332
333    pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
334        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
335        for imm in &pending_imms {
336            self.staging.uploading_imms.push_front(imm.clone());
337        }
338        self.staging.pending_imm_size = 0;
339        pending_imms
340    }
341
342    /// Updates the read version with `VersionUpdate`.
343    /// There will be three data types to be processed
344    /// `VersionUpdate::Staging`
345    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
346    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
347    ///       corresponding `staging_imms` according to the `batch_id`
348    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
349    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
350    /// `staging_sst` and `staging_imm` in memory according to epoch
351    pub fn update(&mut self, info: VersionUpdate) {
352        match info {
353            VersionUpdate::Sst(staging_sst_ref) => {
354                {
355                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
356                        warn!(
357                            instance_id = self.instance_id,
358                            "no related imm in sst input"
359                        );
360                        return;
361                    };
362
363                    // old data comes first
364                    for imm_id in imms.iter().rev() {
365                        let check_err = match self.staging.uploading_imms.pop_back() {
366                            None => Some("empty".to_owned()),
367                            Some(prev_imm_id) => {
368                                if prev_imm_id.batch_id() == *imm_id {
369                                    None
370                                } else {
371                                    Some(format!(
372                                        "miss match id {} {}",
373                                        prev_imm_id.batch_id(),
374                                        *imm_id
375                                    ))
376                                }
377                            }
378                        };
379                        assert!(
380                            check_err.is_none(),
381                            "should be valid staging_sst.size {},
382                                    staging_sst.imm_ids {:?},
383                                    staging_sst.epochs {:?},
384                                    local_pending_imm_ids {:?},
385                                    local_uploading_imm_ids {:?},
386                                    instance_id {}
387                                    check_err {:?}",
388                            staging_sst_ref.imm_size,
389                            staging_sst_ref.imm_ids,
390                            staging_sst_ref.epochs,
391                            self.staging
392                                .pending_imms
393                                .iter()
394                                .map(|imm| imm.batch_id())
395                                .collect_vec(),
396                            self.staging
397                                .uploading_imms
398                                .iter()
399                                .map(|imm| imm.batch_id())
400                                .collect_vec(),
401                            self.instance_id,
402                            check_err
403                        );
404                    }
405
406                    self.staging.sst.push_front(staging_sst_ref);
407                }
408            }
409
410            VersionUpdate::CommittedSnapshot(committed_version) => {
411                if let Some(info) = committed_version
412                    .state_table_info
413                    .info()
414                    .get(&self.table_id)
415                {
416                    let committed_epoch = info.committed_epoch;
417                    if self.is_replicated {
418                        self.staging
419                            .uploading_imms
420                            .retain(|imm| imm.min_epoch() > committed_epoch);
421                        self.staging
422                            .pending_imms
423                            .retain(|imm| imm.min_epoch() > committed_epoch);
424                    } else {
425                        self.staging
426                            .pending_imms
427                            .iter()
428                            .chain(self.staging.uploading_imms.iter())
429                            .for_each(|imm| {
430                                assert!(
431                                    imm.min_epoch() > committed_epoch,
432                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
433                                    imm.table_id,
434                                    imm.min_epoch(),
435                                    committed_epoch
436                                )
437                            });
438                    }
439
440                    self.staging.sst.retain(|sst| {
441                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
442                    });
443
444                    // check epochs.last() > MCE
445                    assert!(self.staging.sst.iter().all(|sst| {
446                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
447                    }));
448
449                    if let Some(committed_watermarks) =
450                        committed_version.table_watermarks.get(&self.table_id)
451                        && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
452                    {
453                        if let Some(watermark_index) = &mut self.table_watermarks {
454                            watermark_index.apply_committed_watermarks(
455                                committed_watermarks.clone(),
456                                committed_epoch,
457                            );
458                        } else {
459                            self.table_watermarks =
460                                Some(PkPrefixTableWatermarksIndex::new_committed(
461                                    committed_watermarks.clone(),
462                                    committed_epoch,
463                                ));
464                        }
465                    }
466                }
467
468                self.committed = committed_version;
469            }
470            VersionUpdate::NewTableWatermark {
471                direction,
472                epoch,
473                vnode_watermarks,
474                watermark_type,
475            } => {
476                assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
477                if let Some(watermark_index) = &mut self.table_watermarks {
478                    watermark_index.add_epoch_watermark(
479                        epoch,
480                        Arc::from(vnode_watermarks),
481                        direction,
482                    );
483                } else {
484                    self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
485                        direction,
486                        epoch,
487                        vnode_watermarks,
488                        self.committed.table_committed_epoch(self.table_id),
489                    ));
490                }
491            }
492        }
493    }
494
495    pub fn staging(&self) -> &StagingVersion {
496        &self.staging
497    }
498
499    pub fn committed(&self) -> &CommittedVersion {
500        &self.committed
501    }
502
503    /// We have assumption that the watermark is increasing monotonically. Therefore,
504    /// here if the upper layer usage has passed an regressed watermark, we should
505    /// filter out the regressed watermark. Currently the kv log store may write
506    /// regressed watermark
507    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
508        if let Some(watermark_index) = &self.table_watermarks {
509            watermark_index.filter_regress_watermarks(watermarks)
510        }
511    }
512
513    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
514        self.table_watermarks
515            .as_ref()
516            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
517    }
518
519    pub fn is_replicated(&self) -> bool {
520        self.is_replicated
521    }
522
523    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
524        std::mem::replace(&mut self.vnodes, vnodes)
525    }
526
527    pub fn contains(&self, vnode: VirtualNode) -> bool {
528        self.vnodes.is_set(vnode.to_index())
529    }
530
531    pub fn vnodes(&self) -> Arc<Bitmap> {
532        self.vnodes.clone()
533    }
534}
535
536pub fn read_filter_for_version(
537    epoch: HummockEpoch,
538    table_id: TableId,
539    mut table_key_range: TableKeyRange,
540    read_version: &RwLock<HummockReadVersion>,
541) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
542    let read_version_guard = read_version.read();
543
544    let committed_version = read_version_guard.committed().clone();
545
546    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
547        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
548    }
549
550    let (imm_iter, sst_iter) =
551        read_version_guard
552            .staging()
553            .prune_overlap(epoch, table_id, &table_key_range);
554
555    let imms = imm_iter.cloned().collect();
556    let ssts = sst_iter.cloned().collect();
557
558    Ok((table_key_range, (imms, ssts, committed_version)))
559}
560
561#[derive(Clone)]
562pub struct HummockVersionReader {
563    sstable_store: SstableStoreRef,
564
565    /// Statistics
566    state_store_metrics: Arc<HummockStateStoreMetrics>,
567    preload_retry_times: usize,
568}
569
570/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
571/// `streaming_query`
572impl HummockVersionReader {
573    pub fn new(
574        sstable_store: SstableStoreRef,
575        state_store_metrics: Arc<HummockStateStoreMetrics>,
576        preload_retry_times: usize,
577    ) -> Self {
578        Self {
579            sstable_store,
580            state_store_metrics,
581            preload_retry_times,
582        }
583    }
584
585    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
586        &self.state_store_metrics
587    }
588}
589
590const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
591
592impl HummockVersionReader {
593    pub async fn get<'a, O>(
594        &'a self,
595        table_key: TableKey<Bytes>,
596        epoch: u64,
597        table_id: TableId,
598        read_options: ReadOptions,
599        read_version_tuple: ReadVersionTuple,
600        on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
601    ) -> StorageResult<Option<O>> {
602        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
603
604        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
605        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
606        let local_stats = &mut stats_guard.local_stats;
607        local_stats.found_key = true;
608
609        // 1. read staging data
610        for imm in &imms {
611            // skip imm that only holding out-of-date data
612            if imm.max_epoch() < min_epoch {
613                continue;
614            }
615
616            local_stats.staging_imm_get_count += 1;
617
618            if let Some((data, data_epoch)) = get_from_batch(
619                imm,
620                TableKey(table_key.as_ref()),
621                epoch,
622                &read_options,
623                local_stats,
624            ) {
625                return Ok(if data_epoch.pure_epoch() < min_epoch {
626                    None
627                } else {
628                    data.into_user_value()
629                        .map(|v| {
630                            on_key_value_fn(
631                                FullKey::new_with_gap_epoch(
632                                    table_id,
633                                    table_key.to_ref(),
634                                    data_epoch,
635                                ),
636                                v.as_ref(),
637                            )
638                        })
639                        .transpose()?
640                });
641            }
642        }
643
644        // 2. order guarantee: imm -> sst
645        let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
646            Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
647        });
648
649        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
650        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
651        let full_key = FullKey::new_with_gap_epoch(
652            table_id,
653            TableKey(table_key.clone()),
654            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
655        );
656        let single_table_key_range = table_key.clone()..=table_key.clone();
657
658        // prune uncommitted ssts with the keyrange
659        let pruned_uncommitted_ssts =
660            prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
661        for local_sst in pruned_uncommitted_ssts {
662            local_stats.staging_sst_get_count += 1;
663            if let Some(iter) = get_from_sstable_info(
664                self.sstable_store.clone(),
665                local_sst,
666                full_key.to_ref(),
667                &read_options,
668                dist_key_hash,
669                local_stats,
670            )
671            .await?
672            {
673                debug_assert!(iter.is_valid());
674                let data_epoch = iter.key().epoch_with_gap;
675                return Ok(if data_epoch.pure_epoch() < min_epoch {
676                    None
677                } else {
678                    iter.value()
679                        .into_user_value()
680                        .map(|v| {
681                            on_key_value_fn(
682                                FullKey::new_with_gap_epoch(
683                                    table_id,
684                                    table_key.to_ref(),
685                                    data_epoch,
686                                ),
687                                v,
688                            )
689                        })
690                        .transpose()?
691                });
692            }
693        }
694        // 3. read from committed_version sst file
695        // Because SST meta records encoded key range,
696        // the filter key needs to be encoded as well.
697        assert!(committed_version.is_valid());
698        for level in committed_version.levels(table_id) {
699            if level.table_infos.is_empty() {
700                continue;
701            }
702
703            match level.level_type {
704                LevelType::Overlapping | LevelType::Unspecified => {
705                    let sstable_infos = prune_overlapping_ssts(
706                        &level.table_infos,
707                        table_id,
708                        &single_table_key_range,
709                    );
710                    for sstable_info in sstable_infos {
711                        local_stats.overlapping_get_count += 1;
712                        if let Some(iter) = get_from_sstable_info(
713                            self.sstable_store.clone(),
714                            sstable_info,
715                            full_key.to_ref(),
716                            &read_options,
717                            dist_key_hash,
718                            local_stats,
719                        )
720                        .await?
721                        {
722                            debug_assert!(iter.is_valid());
723                            let data_epoch = iter.key().epoch_with_gap;
724                            return Ok(if data_epoch.pure_epoch() < min_epoch {
725                                None
726                            } else {
727                                iter.value()
728                                    .into_user_value()
729                                    .map(|v| {
730                                        on_key_value_fn(
731                                            FullKey::new_with_gap_epoch(
732                                                table_id,
733                                                table_key.to_ref(),
734                                                data_epoch,
735                                            ),
736                                            v,
737                                        )
738                                    })
739                                    .transpose()?
740                            });
741                        }
742                    }
743                }
744                LevelType::Nonoverlapping => {
745                    let mut table_info_idx =
746                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
747                    if table_info_idx == 0 {
748                        continue;
749                    }
750                    table_info_idx = table_info_idx.saturating_sub(1);
751                    let ord = level.table_infos[table_info_idx]
752                        .key_range
753                        .compare_right_with_user_key(full_key.user_key.as_ref());
754                    // the case that the key falls into the gap between two ssts
755                    if ord == Ordering::Less {
756                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
757                        continue;
758                    }
759
760                    local_stats.non_overlapping_get_count += 1;
761                    if let Some(iter) = get_from_sstable_info(
762                        self.sstable_store.clone(),
763                        &level.table_infos[table_info_idx],
764                        full_key.to_ref(),
765                        &read_options,
766                        dist_key_hash,
767                        local_stats,
768                    )
769                    .await?
770                    {
771                        debug_assert!(iter.is_valid());
772                        let data_epoch = iter.key().epoch_with_gap;
773                        return Ok(if data_epoch.pure_epoch() < min_epoch {
774                            None
775                        } else {
776                            iter.value()
777                                .into_user_value()
778                                .map(|v| {
779                                    on_key_value_fn(
780                                        FullKey::new_with_gap_epoch(
781                                            table_id,
782                                            table_key.to_ref(),
783                                            data_epoch,
784                                        ),
785                                        v,
786                                    )
787                                })
788                                .transpose()?
789                        });
790                    }
791                }
792            }
793        }
794        stats_guard.local_stats.found_key = false;
795        Ok(None)
796    }
797
798    pub async fn iter(
799        &self,
800        table_key_range: TableKeyRange,
801        epoch: u64,
802        table_id: TableId,
803        read_options: ReadOptions,
804        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
805    ) -> StorageResult<HummockStorageIterator> {
806        self.iter_with_memtable(
807            table_key_range,
808            epoch,
809            table_id,
810            read_options,
811            read_version_tuple,
812            None,
813        )
814        .await
815    }
816
817    pub async fn iter_with_memtable<'b>(
818        &self,
819        table_key_range: TableKeyRange,
820        epoch: u64,
821        table_id: TableId,
822        read_options: ReadOptions,
823        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
824        memtable_iter: Option<MemTableHummockIterator<'b>>,
825    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
826        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
827        let user_key_range = (
828            user_key_range_ref.0.map(|key| key.cloned()),
829            user_key_range_ref.1.map(|key| key.cloned()),
830        );
831        let mut factory = ForwardIteratorFactory::default();
832        let mut local_stats = StoreLocalStatistic::default();
833        let (imms, uncommitted_ssts, committed) = read_version_tuple;
834        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
835        self.iter_inner(
836            table_key_range,
837            epoch,
838            table_id,
839            read_options,
840            imms,
841            uncommitted_ssts,
842            &committed,
843            &mut local_stats,
844            &mut factory,
845        )
846        .await?;
847        let merge_iter = factory.build(memtable_iter);
848        // the epoch_range left bound for iterator read
849        let mut user_iter = UserIterator::new(
850            merge_iter,
851            user_key_range,
852            epoch,
853            min_epoch,
854            Some(committed),
855        );
856        user_iter.rewind().await?;
857        Ok(HummockStorageIteratorInner::new(
858            user_iter,
859            self.state_store_metrics.clone(),
860            table_id,
861            local_stats,
862        ))
863    }
864
865    pub async fn rev_iter<'b>(
866        &self,
867        table_key_range: TableKeyRange,
868        epoch: u64,
869        table_id: TableId,
870        read_options: ReadOptions,
871        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
872        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
873    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
874        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
875        let user_key_range = (
876            user_key_range_ref.0.map(|key| key.cloned()),
877            user_key_range_ref.1.map(|key| key.cloned()),
878        );
879        let mut factory = BackwardIteratorFactory::default();
880        let mut local_stats = StoreLocalStatistic::default();
881        let (imms, uncommitted_ssts, committed) = read_version_tuple;
882        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
883        self.iter_inner(
884            table_key_range,
885            epoch,
886            table_id,
887            read_options,
888            imms,
889            uncommitted_ssts,
890            &committed,
891            &mut local_stats,
892            &mut factory,
893        )
894        .await?;
895        let merge_iter = factory.build(memtable_iter);
896        // the epoch_range left bound for iterator read
897        let mut user_iter = BackwardUserIterator::new(
898            merge_iter,
899            user_key_range,
900            epoch,
901            min_epoch,
902            Some(committed),
903        );
904        user_iter.rewind().await?;
905        Ok(HummockStorageRevIteratorInner::new(
906            user_iter,
907            self.state_store_metrics.clone(),
908            table_id,
909            local_stats,
910        ))
911    }
912
913    async fn iter_inner<F: IteratorFactory>(
914        &self,
915        table_key_range: TableKeyRange,
916        epoch: u64,
917        table_id: TableId,
918        read_options: ReadOptions,
919        imms: Vec<ImmutableMemtable>,
920        uncommitted_ssts: Vec<SstableInfo>,
921        committed: &CommittedVersion,
922        local_stats: &mut StoreLocalStatistic,
923        factory: &mut F,
924    ) -> StorageResult<()> {
925        {
926            fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
927                match bound {
928                    Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
929                    Bound::Unbounded => None,
930                }
931            }
932            let (left, right) = &table_key_range;
933            if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
934                && right < left
935            {
936                if cfg!(debug_assertions) {
937                    panic!("invalid iter key range: {table_id} {left:?} {right:?}")
938                } else {
939                    return Err(HummockError::other(format!(
940                        "invalid iter key range: {table_id} {left:?} {right:?}"
941                    ))
942                    .into());
943                }
944            }
945        }
946
947        local_stats.staging_imm_iter_count = imms.len() as u64;
948        for imm in imms {
949            factory.add_batch_iter(imm);
950        }
951
952        // 2. build iterator from committed
953        // Because SST meta records encoded key range,
954        // the filter key range needs to be encoded as well.
955        let user_key_range = bound_table_key_range(table_id, &table_key_range);
956        let user_key_range_ref = (
957            user_key_range.0.as_ref().map(UserKey::as_ref),
958            user_key_range.1.as_ref().map(UserKey::as_ref),
959        );
960        let mut staging_sst_iter_count = 0;
961        // encode once
962        let bloom_filter_prefix_hash = read_options
963            .prefix_hint
964            .as_ref()
965            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
966        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
967        if read_options.prefetch_options.prefetch {
968            sst_read_options.must_iterated_end_user_key =
969                Some(user_key_range.1.map(|key| key.cloned()));
970            sst_read_options.max_preload_retry_times = self.preload_retry_times;
971        }
972        let sst_read_options = Arc::new(sst_read_options);
973        for sstable_info in &uncommitted_ssts {
974            let table_holder = self
975                .sstable_store
976                .sstable(sstable_info, local_stats)
977                .await?;
978
979            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
980                && !hit_sstable_bloom_filter(
981                    &table_holder,
982                    &user_key_range_ref,
983                    *prefix_hash,
984                    local_stats,
985                )
986            {
987                continue;
988            }
989
990            staging_sst_iter_count += 1;
991            factory.add_staging_sst_iter(F::SstableIteratorType::create(
992                table_holder,
993                self.sstable_store.clone(),
994                sst_read_options.clone(),
995                sstable_info,
996            ));
997        }
998        local_stats.staging_sst_iter_count = staging_sst_iter_count;
999
1000        let timer = Instant::now();
1001
1002        for level in committed.levels(table_id) {
1003            if level.table_infos.is_empty() {
1004                continue;
1005            }
1006
1007            if level.level_type == LevelType::Nonoverlapping {
1008                let mut table_infos =
1009                    prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1010                        .peekable();
1011
1012                if table_infos.peek().is_none() {
1013                    continue;
1014                }
1015                let sstable_infos = table_infos.cloned().collect_vec();
1016                if sstable_infos.len() > 1 {
1017                    factory.add_concat_sst_iter(
1018                        sstable_infos,
1019                        self.sstable_store.clone(),
1020                        sst_read_options.clone(),
1021                    );
1022                    local_stats.non_overlapping_iter_count += 1;
1023                } else {
1024                    let sstable = self
1025                        .sstable_store
1026                        .sstable(&sstable_infos[0], local_stats)
1027                        .await?;
1028
1029                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1030                        && !hit_sstable_bloom_filter(
1031                            &sstable,
1032                            &user_key_range_ref,
1033                            *dist_hash,
1034                            local_stats,
1035                        )
1036                    {
1037                        continue;
1038                    }
1039                    // Since there is only one sst to be included for the current non-overlapping
1040                    // level, there is no need to create a ConcatIterator on it.
1041                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1042                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1043                    // it in `non_overlapping_iter_count`.
1044                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1045                        sstable,
1046                        self.sstable_store.clone(),
1047                        sst_read_options.clone(),
1048                        &sstable_infos[0],
1049                    ));
1050                    local_stats.non_overlapping_iter_count += 1;
1051                }
1052            } else {
1053                let table_infos =
1054                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1055                // Overlapping
1056                let fetch_meta_req = table_infos.rev().collect_vec();
1057                if fetch_meta_req.is_empty() {
1058                    continue;
1059                }
1060                for sstable_info in fetch_meta_req {
1061                    let sstable = self
1062                        .sstable_store
1063                        .sstable(sstable_info, local_stats)
1064                        .await?;
1065                    assert_eq!(sstable_info.object_id, sstable.id);
1066                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1067                        && !hit_sstable_bloom_filter(
1068                            &sstable,
1069                            &user_key_range_ref,
1070                            *dist_hash,
1071                            local_stats,
1072                        )
1073                    {
1074                        continue;
1075                    }
1076                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1077                        sstable,
1078                        self.sstable_store.clone(),
1079                        sst_read_options.clone(),
1080                        sstable_info,
1081                    ));
1082                    local_stats.overlapping_iter_count += 1;
1083                }
1084            }
1085        }
1086        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1087        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1088            let table_id_string = table_id.to_string();
1089            tracing::warn!(
1090                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1091                table_id_string,
1092                epoch,
1093                fetch_meta_duration_sec,
1094                local_stats.cache_meta_block_miss
1095            );
1096            self.state_store_metrics
1097                .iter_slow_fetch_meta_cache_unhits
1098                .set(local_stats.cache_meta_block_miss as i64);
1099        }
1100        Ok(())
1101    }
1102
1103    pub async fn iter_log(
1104        &self,
1105        version: PinnedVersion,
1106        epoch_range: (u64, u64),
1107        key_range: TableKeyRange,
1108        options: ReadLogOptions,
1109    ) -> HummockResult<ChangeLogIterator> {
1110        let change_log = {
1111            let table_change_logs = version.table_change_log_read_lock();
1112            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1113                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1114            } else {
1115                Vec::new()
1116            }
1117        };
1118
1119        if let Some(max_epoch_change_log) = change_log.last() {
1120            let (_, max_epoch) = epoch_range;
1121            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1122                warn!(
1123                    max_epoch,
1124                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1125                    table_id = %options.table_id,
1126                    "max_epoch does not exist"
1127                );
1128            }
1129        }
1130        let read_options = Arc::new(SstableIteratorReadOptions {
1131            cache_policy: Default::default(),
1132            must_iterated_end_user_key: None,
1133            max_preload_retry_times: 0,
1134            prefetch_for_large_query: false,
1135        });
1136
1137        async fn make_iter(
1138            sstable_infos: impl Iterator<Item = &SstableInfo>,
1139            sstable_store: &SstableStoreRef,
1140            read_options: Arc<SstableIteratorReadOptions>,
1141            local_stat: &mut StoreLocalStatistic,
1142        ) -> HummockResult<MergeIterator<SstableIterator>> {
1143            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1144                let sstable_store = sstable_store.clone();
1145                let read_options = read_options.clone();
1146                async move {
1147                    let mut local_stat = StoreLocalStatistic::default();
1148                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1149                    Ok::<_, HummockError>((
1150                        SstableIterator::new(
1151                            table_holder,
1152                            sstable_store,
1153                            read_options,
1154                            sstable_info,
1155                        ),
1156                        local_stat,
1157                    ))
1158                }
1159            }))
1160            .await?;
1161            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1162                |(iter, stats)| {
1163                    local_stat.add(&stats);
1164                    iter
1165                },
1166            )))
1167        }
1168
1169        let mut local_stat = StoreLocalStatistic::default();
1170
1171        let new_value_iter = make_iter(
1172            change_log
1173                .iter()
1174                .flat_map(|log| log.new_value.iter())
1175                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1176            &self.sstable_store,
1177            read_options.clone(),
1178            &mut local_stat,
1179        )
1180        .await?;
1181        let old_value_iter = make_iter(
1182            change_log
1183                .iter()
1184                .flat_map(|log| log.old_value.iter())
1185                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1186            &self.sstable_store,
1187            read_options.clone(),
1188            &mut local_stat,
1189        )
1190        .await?;
1191        ChangeLogIterator::new(
1192            epoch_range,
1193            key_range,
1194            new_value_iter,
1195            old_value_iter,
1196            options.table_id,
1197            IterLocalMetricsGuard::new(
1198                self.state_store_metrics.clone(),
1199                options.table_id,
1200                local_stat,
1201            ),
1202        )
1203        .await
1204    }
1205
1206    pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1207        &'a self,
1208        version: PinnedVersion,
1209        table_id: TableId,
1210        target: VectorRef<'a>,
1211        options: VectorNearestOptions,
1212        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1213    ) -> HummockResult<Vec<O>> {
1214        let Some(index) = version.vector_indexes.get(&table_id) else {
1215            return Ok(vec![]);
1216        };
1217        if target.dimension() != index.dimension {
1218            return Err(HummockError::other(format!(
1219                "target dimension {} not match index dimension {}",
1220                target.dimension(),
1221                index.dimension
1222            )));
1223        }
1224        match &index.inner {
1225            VectorIndexImpl::Flat(flat) => {
1226                let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1227                let mut cache_stat = VectorStoreCacheStats::default();
1228                for vector_file in &flat.vector_store_info.vector_files {
1229                    let meta = self
1230                        .sstable_store
1231                        .get_vector_file_meta(vector_file, &mut cache_stat)
1232                        .await?;
1233                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1234                        let block = self
1235                            .sstable_store
1236                            .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1237                            .await?;
1238                        builder.add(&**block, &on_nearest_item_fn);
1239                    }
1240                }
1241                cache_stat.report(table_id, "flat", self.stats());
1242                Ok(builder.finish())
1243            }
1244            VectorIndexImpl::HnswFlat(hnsw_flat) => {
1245                let Some(graph_file) = &hnsw_flat.graph_file else {
1246                    return Ok(vec![]);
1247                };
1248
1249                let mut ctx = FileVectorStoreCtx::default();
1250
1251                let graph = self
1252                    .sstable_store
1253                    .get_hnsw_graph(graph_file, &mut ctx.stats)
1254                    .await?;
1255
1256                let vector_store =
1257                    FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1258                let (items, stats) = nearest::<O, M, _>(
1259                    &vector_store,
1260                    &mut ctx,
1261                    &*graph,
1262                    target,
1263                    on_nearest_item_fn,
1264                    options.hnsw_ef_search,
1265                    options.top_n,
1266                )
1267                .await?;
1268                ctx.stats.report(table_id, "hnsw_read", self.stats());
1269                report_hnsw_stat(
1270                    self.stats(),
1271                    table_id,
1272                    "hnsw_read",
1273                    options.top_n,
1274                    options.hnsw_ef_search,
1275                    [stats],
1276                );
1277                Ok(items)
1278            }
1279        }
1280    }
1281}