risingwave_storage/hummock/store/
version.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::vec_deque::VecDeque;
17use std::collections::{Bound, HashMap};
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use futures::future::try_join_all;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36    PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::vector_index::VectorIndexImpl;
39use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
40use risingwave_pb::hummock::LevelType;
41use sync_point::sync_point;
42use tracing::warn;
43
44use crate::error::StorageResult;
45use crate::hummock::event_handler::LocalInstanceId;
46use crate::hummock::iterator::change_log::ChangeLogIterator;
47use crate::hummock::iterator::{
48    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
49};
50use crate::hummock::local_version::pinned_version::PinnedVersion;
51use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
52use crate::hummock::sstable_store::SstableStoreRef;
53use crate::hummock::utils::{
54    filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
55    search_sst_idx,
56};
57use crate::hummock::vector::file::{FileVectorStore, FileVectorStoreCtx};
58use crate::hummock::vector::monitor::{VectorStoreCacheStats, report_hnsw_stat};
59use crate::hummock::{
60    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
61    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
62    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
63    hit_sstable_bloom_filter,
64};
65use crate::mem_table::{
66    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
67};
68use crate::monitor::{
69    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
70};
71use crate::store::{
72    OnNearestItemFn, ReadLogOptions, ReadOptions, VectorNearestOptions, gen_min_epoch,
73};
74use crate::vector::hnsw::nearest;
75use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
76
77pub type CommittedVersion = PinnedVersion;
78
79/// Data not committed to Hummock. There are two types of staging data:
80/// - Immutable memtable: data that has been written into local state store but not persisted.
81/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
82///   hummock version.
83
84#[derive(Clone, Debug, PartialEq)]
85pub struct StagingSstableInfo {
86    // newer data comes first
87    sstable_infos: Vec<LocalSstableInfo>,
88    old_value_sstable_infos: Vec<LocalSstableInfo>,
89    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
90    /// The field must not be empty.
91    epochs: Vec<HummockEpoch>,
92    // newer data at the front
93    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
94    imm_size: usize,
95}
96
97impl StagingSstableInfo {
98    pub fn new(
99        sstable_infos: Vec<LocalSstableInfo>,
100        old_value_sstable_infos: Vec<LocalSstableInfo>,
101        epochs: Vec<HummockEpoch>,
102        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
103        imm_size: usize,
104    ) -> Self {
105        // the epochs are sorted from higher epoch to lower epoch
106        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
107        Self {
108            sstable_infos,
109            old_value_sstable_infos,
110            epochs,
111            imm_ids,
112            imm_size,
113        }
114    }
115
116    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
117        &self.sstable_infos
118    }
119
120    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
121        &self.old_value_sstable_infos
122    }
123
124    pub fn imm_size(&self) -> usize {
125        self.imm_size
126    }
127
128    pub fn epochs(&self) -> &Vec<HummockEpoch> {
129        &self.epochs
130    }
131
132    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
133        &self.imm_ids
134    }
135}
136
137pub enum VersionUpdate {
138    Sst(Arc<StagingSstableInfo>),
139    CommittedSnapshot(CommittedVersion),
140    NewTableWatermark {
141        direction: WatermarkDirection,
142        epoch: HummockEpoch,
143        vnode_watermarks: Vec<VnodeWatermark>,
144        watermark_type: WatermarkSerdeType,
145    },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150    pending_imm_size: usize,
151    /// It contains the imms added but not sent to the uploader of hummock event handler.
152    /// It is non-empty only when `upload_on_flush` is false.
153    ///
154    /// It will be sent to the uploader when `pending_imm_size` exceed threshold or on `seal_current_epoch`.
155    ///
156    /// newer data comes last
157    pub pending_imms: Vec<ImmutableMemtable>,
158    /// It contains the imms already sent to uploader of hummock event handler.
159    /// Note: Currently, building imm and writing to staging version is not atomic, and therefore
160    /// imm of smaller batch id may be added later than one with greater batch id
161    ///
162    /// Newer data comes first.
163    pub uploading_imms: VecDeque<ImmutableMemtable>,
164
165    // newer data comes first
166    pub sst: VecDeque<Arc<StagingSstableInfo>>,
167}
168
169impl StagingVersion {
170    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
171    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
172    pub fn prune_overlap<'a>(
173        &'a self,
174        max_epoch_inclusive: HummockEpoch,
175        table_id: TableId,
176        table_key_range: &'a TableKeyRange,
177    ) -> (
178        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
179        impl Iterator<Item = &'a SstableInfo> + 'a,
180    ) {
181        let (left, right) = table_key_range;
182        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
183        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
184        let overlapped_imms = self
185            .pending_imms
186            .iter()
187            .rev() // rev to let newer imm come first
188            .chain(self.uploading_imms.iter())
189            .filter(move |imm| {
190                // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
191                imm.min_epoch() <= max_epoch_inclusive
192                    && imm.table_id == table_id
193                    && range_overlap(
194                        &(left, right),
195                        &imm.start_table_key(),
196                        Bound::Included(&imm.end_table_key()),
197                    )
198            });
199
200        // TODO: Remove duplicate sst based on sst id
201        let overlapped_ssts = self
202            .sst
203            .iter()
204            .filter(move |staging_sst| {
205                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
206                sst_max_epoch <= max_epoch_inclusive
207            })
208            .flat_map(move |staging_sst| {
209                // TODO: sstable info should be concat-able after each streaming table owns a read
210                // version. May use concat sstable iter instead in some cases.
211                staging_sst
212                    .sstable_infos
213                    .iter()
214                    .map(|sstable| &sstable.sst_info)
215                    .filter(move |sstable: &&SstableInfo| {
216                        filter_single_sst(sstable, table_id, table_key_range)
217                    })
218            });
219        (overlapped_imms, overlapped_ssts)
220    }
221
222    pub fn is_empty(&self) -> bool {
223        self.pending_imms.is_empty() && self.uploading_imms.is_empty() && self.sst.is_empty()
224    }
225}
226
227#[derive(Clone)]
228/// A container of information required for reading from hummock.
229pub struct HummockReadVersion {
230    table_id: TableId,
231    instance_id: LocalInstanceId,
232
233    is_initialized: bool,
234
235    /// Local version for staging data.
236    staging: StagingVersion,
237
238    /// Remote version for committed data.
239    committed: CommittedVersion,
240
241    /// Indicate if this is replicated. If it is, we should ignore it during
242    /// global state store read, to avoid duplicated results.
243    /// Otherwise for local state store, it is fine, see we will see the
244    /// `ReadVersion` just for that local state store.
245    is_replicated: bool,
246
247    table_watermarks: Option<PkPrefixTableWatermarksIndex>,
248
249    // Vnode bitmap corresponding to the read version
250    // It will be initialized after local state store init
251    vnodes: Arc<Bitmap>,
252}
253
254impl HummockReadVersion {
255    pub fn new_with_replication_option(
256        table_id: TableId,
257        instance_id: LocalInstanceId,
258        committed_version: CommittedVersion,
259        is_replicated: bool,
260        vnodes: Arc<Bitmap>,
261    ) -> Self {
262        // before build `HummockReadVersion`, we need to get the a initial version which obtained
263        // from meta. want this initialization after version is initialized (now with
264        // notification), so add a assert condition to guarantee correct initialization order
265        assert!(committed_version.is_valid());
266        Self {
267            table_id,
268            instance_id,
269            table_watermarks: {
270                match committed_version.table_watermarks.get(&table_id) {
271                    Some(table_watermarks) => match table_watermarks.watermark_type {
272                        WatermarkSerdeType::PkPrefix => {
273                            Some(PkPrefixTableWatermarksIndex::new_committed(
274                                table_watermarks.clone(),
275                                committed_version
276                                    .state_table_info
277                                    .info()
278                                    .get(&table_id)
279                                    .expect("should exist")
280                                    .committed_epoch,
281                            ))
282                        }
283                        WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */
284                        WatermarkSerdeType::Value => None,
285                    },
286                    None => None,
287                }
288            },
289            staging: StagingVersion {
290                pending_imm_size: 0,
291                pending_imms: Vec::default(),
292                uploading_imms: VecDeque::default(),
293                sst: VecDeque::default(),
294            },
295
296            committed: committed_version,
297
298            is_replicated,
299            vnodes,
300            is_initialized: false,
301        }
302    }
303
304    pub fn new(
305        table_id: TableId,
306        instance_id: LocalInstanceId,
307        committed_version: CommittedVersion,
308        vnodes: Arc<Bitmap>,
309    ) -> Self {
310        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
311    }
312
313    pub fn table_id(&self) -> TableId {
314        self.table_id
315    }
316
317    pub fn init(&mut self) {
318        assert!(!self.is_initialized);
319        self.is_initialized = true;
320    }
321
322    pub fn add_imm(&mut self, imm: ImmutableMemtable) {
323        assert!(self.is_initialized);
324        if let Some(item) = self
325            .staging
326            .pending_imms
327            .last()
328            .or_else(|| self.staging.uploading_imms.front())
329        {
330            // check batch_id order from newest to old
331            debug_assert!(item.batch_id() < imm.batch_id());
332        }
333
334        self.staging.pending_imm_size += imm.size();
335        self.staging.pending_imms.push(imm);
336    }
337
338    pub fn pending_imm_size(&self) -> usize {
339        self.staging.pending_imm_size
340    }
341
342    pub fn start_upload_pending_imms(&mut self) -> Vec<ImmutableMemtable> {
343        let pending_imms = std::mem::take(&mut self.staging.pending_imms);
344        for imm in &pending_imms {
345            self.staging.uploading_imms.push_front(imm.clone());
346        }
347        self.staging.pending_imm_size = 0;
348        pending_imms
349    }
350
351    /// Updates the read version with `VersionUpdate`.
352    /// There will be three data types to be processed
353    /// `VersionUpdate::Staging`
354    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
355    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
356    ///       corresponding `staging_imms` according to the `batch_id`
357    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
358    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
359    /// `staging_sst` and `staging_imm` in memory according to epoch
360    pub fn update(&mut self, info: VersionUpdate) {
361        match info {
362            VersionUpdate::Sst(staging_sst_ref) => {
363                {
364                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
365                        warn!(
366                            instance_id = self.instance_id,
367                            "no related imm in sst input"
368                        );
369                        return;
370                    };
371
372                    // old data comes first
373                    for imm_id in imms.iter().rev() {
374                        let check_err = match self.staging.uploading_imms.pop_back() {
375                            None => Some("empty".to_owned()),
376                            Some(prev_imm_id) => {
377                                if prev_imm_id.batch_id() == *imm_id {
378                                    None
379                                } else {
380                                    Some(format!(
381                                        "miss match id {} {}",
382                                        prev_imm_id.batch_id(),
383                                        *imm_id
384                                    ))
385                                }
386                            }
387                        };
388                        assert!(
389                            check_err.is_none(),
390                            "should be valid staging_sst.size {},
391                                    staging_sst.imm_ids {:?},
392                                    staging_sst.epochs {:?},
393                                    local_pending_imm_ids {:?},
394                                    local_uploading_imm_ids {:?},
395                                    instance_id {}
396                                    check_err {:?}",
397                            staging_sst_ref.imm_size,
398                            staging_sst_ref.imm_ids,
399                            staging_sst_ref.epochs,
400                            self.staging
401                                .pending_imms
402                                .iter()
403                                .map(|imm| imm.batch_id())
404                                .collect_vec(),
405                            self.staging
406                                .uploading_imms
407                                .iter()
408                                .map(|imm| imm.batch_id())
409                                .collect_vec(),
410                            self.instance_id,
411                            check_err
412                        );
413                    }
414
415                    self.staging.sst.push_front(staging_sst_ref);
416                }
417            }
418
419            VersionUpdate::CommittedSnapshot(committed_version) => {
420                if let Some(info) = committed_version
421                    .state_table_info
422                    .info()
423                    .get(&self.table_id)
424                {
425                    let committed_epoch = info.committed_epoch;
426                    if self.is_replicated {
427                        self.staging
428                            .uploading_imms
429                            .retain(|imm| imm.min_epoch() > committed_epoch);
430                        self.staging
431                            .pending_imms
432                            .retain(|imm| imm.min_epoch() > committed_epoch);
433                    } else {
434                        self.staging
435                            .pending_imms
436                            .iter()
437                            .chain(self.staging.uploading_imms.iter())
438                            .for_each(|imm| {
439                                assert!(
440                                    imm.min_epoch() > committed_epoch,
441                                    "imm of table {} min_epoch {} should be greater than committed_epoch {}",
442                                    imm.table_id,
443                                    imm.min_epoch(),
444                                    committed_epoch
445                                )
446                            });
447                    }
448
449                    self.staging.sst.retain(|sst| {
450                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
451                    });
452
453                    // check epochs.last() > MCE
454                    assert!(self.staging.sst.iter().all(|sst| {
455                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
456                    }));
457
458                    if let Some(committed_watermarks) =
459                        committed_version.table_watermarks.get(&self.table_id)
460                        && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
461                    {
462                        if let Some(watermark_index) = &mut self.table_watermarks {
463                            watermark_index.apply_committed_watermarks(
464                                committed_watermarks.clone(),
465                                committed_epoch,
466                            );
467                        } else {
468                            self.table_watermarks =
469                                Some(PkPrefixTableWatermarksIndex::new_committed(
470                                    committed_watermarks.clone(),
471                                    committed_epoch,
472                                ));
473                        }
474                    }
475                }
476
477                self.committed = committed_version;
478            }
479            VersionUpdate::NewTableWatermark {
480                direction,
481                epoch,
482                vnode_watermarks,
483                watermark_type,
484            } => {
485                assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
486                if let Some(watermark_index) = &mut self.table_watermarks {
487                    watermark_index.add_epoch_watermark(
488                        epoch,
489                        Arc::from(vnode_watermarks),
490                        direction,
491                    );
492                } else {
493                    self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
494                        direction,
495                        epoch,
496                        vnode_watermarks,
497                        self.committed.table_committed_epoch(self.table_id),
498                    ));
499                }
500            }
501        }
502    }
503
504    pub fn staging(&self) -> &StagingVersion {
505        &self.staging
506    }
507
508    pub fn committed(&self) -> &CommittedVersion {
509        &self.committed
510    }
511
512    /// We have assumption that the watermark is increasing monotonically. Therefore,
513    /// here if the upper layer usage has passed an regressed watermark, we should
514    /// filter out the regressed watermark. Currently the kv log store may write
515    /// regressed watermark
516    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
517        if let Some(watermark_index) = &self.table_watermarks {
518            watermark_index.filter_regress_watermarks(watermarks)
519        }
520    }
521
522    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
523        self.table_watermarks
524            .as_ref()
525            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
526    }
527
528    pub fn is_initialized(&self) -> bool {
529        self.is_initialized
530    }
531
532    pub fn is_replicated(&self) -> bool {
533        self.is_replicated
534    }
535
536    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
537        std::mem::replace(&mut self.vnodes, vnodes)
538    }
539
540    pub fn contains(&self, vnode: VirtualNode) -> bool {
541        self.vnodes.is_set(vnode.to_index())
542    }
543
544    pub fn vnodes(&self) -> Arc<Bitmap> {
545        self.vnodes.clone()
546    }
547}
548
549pub fn read_filter_for_version(
550    epoch: HummockEpoch,
551    table_id: TableId,
552    mut table_key_range: TableKeyRange,
553    read_version: &RwLock<HummockReadVersion>,
554) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
555    let read_version_guard = read_version.read();
556
557    let committed_version = read_version_guard.committed().clone();
558
559    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
560        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
561    }
562
563    let (imm_iter, sst_iter) =
564        read_version_guard
565            .staging()
566            .prune_overlap(epoch, table_id, &table_key_range);
567
568    let imms = imm_iter.cloned().collect();
569    let ssts = sst_iter.cloned().collect();
570
571    Ok((table_key_range, (imms, ssts, committed_version)))
572}
573
574#[derive(Clone)]
575pub struct HummockVersionReader {
576    sstable_store: SstableStoreRef,
577
578    /// Statistics
579    state_store_metrics: Arc<HummockStateStoreMetrics>,
580    preload_retry_times: usize,
581}
582
583/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
584/// `streaming_query`
585impl HummockVersionReader {
586    pub fn new(
587        sstable_store: SstableStoreRef,
588        state_store_metrics: Arc<HummockStateStoreMetrics>,
589        preload_retry_times: usize,
590    ) -> Self {
591        Self {
592            sstable_store,
593            state_store_metrics,
594            preload_retry_times,
595        }
596    }
597
598    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
599        &self.state_store_metrics
600    }
601}
602
603const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
604
605impl HummockVersionReader {
606    pub async fn get<'a, O>(
607        &'a self,
608        table_key: TableKey<Bytes>,
609        epoch: u64,
610        table_id: TableId,
611        read_options: ReadOptions,
612        read_version_tuple: ReadVersionTuple,
613        on_key_value_fn: impl crate::store::KeyValueFn<'a, O>,
614    ) -> StorageResult<Option<O>> {
615        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
616
617        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
618        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
619        let local_stats = &mut stats_guard.local_stats;
620        local_stats.found_key = true;
621
622        // 1. read staging data
623        for imm in &imms {
624            // skip imm that only holding out-of-date data
625            if imm.max_epoch() < min_epoch {
626                continue;
627            }
628
629            local_stats.staging_imm_get_count += 1;
630
631            if let Some((data, data_epoch)) = get_from_batch(
632                imm,
633                TableKey(table_key.as_ref()),
634                epoch,
635                &read_options,
636                local_stats,
637            ) {
638                return Ok(if data_epoch.pure_epoch() < min_epoch {
639                    None
640                } else {
641                    data.into_user_value()
642                        .map(|v| {
643                            on_key_value_fn(
644                                FullKey::new_with_gap_epoch(
645                                    table_id,
646                                    table_key.to_ref(),
647                                    data_epoch,
648                                ),
649                                v.as_ref(),
650                            )
651                        })
652                        .transpose()?
653                });
654            }
655        }
656
657        // 2. order guarantee: imm -> sst
658        let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
659            Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.as_raw_id())
660        });
661
662        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
663        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
664        let full_key = FullKey::new_with_gap_epoch(
665            table_id,
666            TableKey(table_key.clone()),
667            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
668        );
669        let single_table_key_range = table_key.clone()..=table_key.clone();
670
671        // prune uncommitted ssts with the keyrange
672        let pruned_uncommitted_ssts =
673            prune_overlapping_ssts(&uncommitted_ssts, table_id, &single_table_key_range);
674        for local_sst in pruned_uncommitted_ssts {
675            local_stats.staging_sst_get_count += 1;
676            if let Some(iter) = get_from_sstable_info(
677                self.sstable_store.clone(),
678                local_sst,
679                full_key.to_ref(),
680                &read_options,
681                dist_key_hash,
682                local_stats,
683            )
684            .await?
685            {
686                debug_assert!(iter.is_valid());
687                let data_epoch = iter.key().epoch_with_gap;
688                return Ok(if data_epoch.pure_epoch() < min_epoch {
689                    None
690                } else {
691                    iter.value()
692                        .into_user_value()
693                        .map(|v| {
694                            on_key_value_fn(
695                                FullKey::new_with_gap_epoch(
696                                    table_id,
697                                    table_key.to_ref(),
698                                    data_epoch,
699                                ),
700                                v,
701                            )
702                        })
703                        .transpose()?
704                });
705            }
706        }
707        // 3. read from committed_version sst file
708        // Because SST meta records encoded key range,
709        // the filter key needs to be encoded as well.
710        assert!(committed_version.is_valid());
711        for level in committed_version.levels(table_id) {
712            if level.table_infos.is_empty() {
713                continue;
714            }
715
716            match level.level_type {
717                LevelType::Overlapping | LevelType::Unspecified => {
718                    let sstable_infos = prune_overlapping_ssts(
719                        &level.table_infos,
720                        table_id,
721                        &single_table_key_range,
722                    );
723                    for sstable_info in sstable_infos {
724                        local_stats.overlapping_get_count += 1;
725                        if let Some(iter) = get_from_sstable_info(
726                            self.sstable_store.clone(),
727                            sstable_info,
728                            full_key.to_ref(),
729                            &read_options,
730                            dist_key_hash,
731                            local_stats,
732                        )
733                        .await?
734                        {
735                            debug_assert!(iter.is_valid());
736                            let data_epoch = iter.key().epoch_with_gap;
737                            return Ok(if data_epoch.pure_epoch() < min_epoch {
738                                None
739                            } else {
740                                iter.value()
741                                    .into_user_value()
742                                    .map(|v| {
743                                        on_key_value_fn(
744                                            FullKey::new_with_gap_epoch(
745                                                table_id,
746                                                table_key.to_ref(),
747                                                data_epoch,
748                                            ),
749                                            v,
750                                        )
751                                    })
752                                    .transpose()?
753                            });
754                        }
755                    }
756                }
757                LevelType::Nonoverlapping => {
758                    let mut table_info_idx =
759                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
760                    if table_info_idx == 0 {
761                        continue;
762                    }
763                    table_info_idx = table_info_idx.saturating_sub(1);
764                    let ord = level.table_infos[table_info_idx]
765                        .key_range
766                        .compare_right_with_user_key(full_key.user_key.as_ref());
767                    // the case that the key falls into the gap between two ssts
768                    if ord == Ordering::Less {
769                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
770                        continue;
771                    }
772
773                    local_stats.non_overlapping_get_count += 1;
774                    if let Some(iter) = get_from_sstable_info(
775                        self.sstable_store.clone(),
776                        &level.table_infos[table_info_idx],
777                        full_key.to_ref(),
778                        &read_options,
779                        dist_key_hash,
780                        local_stats,
781                    )
782                    .await?
783                    {
784                        debug_assert!(iter.is_valid());
785                        let data_epoch = iter.key().epoch_with_gap;
786                        return Ok(if data_epoch.pure_epoch() < min_epoch {
787                            None
788                        } else {
789                            iter.value()
790                                .into_user_value()
791                                .map(|v| {
792                                    on_key_value_fn(
793                                        FullKey::new_with_gap_epoch(
794                                            table_id,
795                                            table_key.to_ref(),
796                                            data_epoch,
797                                        ),
798                                        v,
799                                    )
800                                })
801                                .transpose()?
802                        });
803                    }
804                }
805            }
806        }
807        stats_guard.local_stats.found_key = false;
808        Ok(None)
809    }
810
811    pub async fn iter(
812        &self,
813        table_key_range: TableKeyRange,
814        epoch: u64,
815        table_id: TableId,
816        read_options: ReadOptions,
817        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
818    ) -> StorageResult<HummockStorageIterator> {
819        self.iter_with_memtable(
820            table_key_range,
821            epoch,
822            table_id,
823            read_options,
824            read_version_tuple,
825            None,
826        )
827        .await
828    }
829
830    pub async fn iter_with_memtable<'b>(
831        &self,
832        table_key_range: TableKeyRange,
833        epoch: u64,
834        table_id: TableId,
835        read_options: ReadOptions,
836        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
837        memtable_iter: Option<MemTableHummockIterator<'b>>,
838    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
839        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
840        let user_key_range = (
841            user_key_range_ref.0.map(|key| key.cloned()),
842            user_key_range_ref.1.map(|key| key.cloned()),
843        );
844        let mut factory = ForwardIteratorFactory::default();
845        let mut local_stats = StoreLocalStatistic::default();
846        let (imms, uncommitted_ssts, committed) = read_version_tuple;
847        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
848        self.iter_inner(
849            table_key_range,
850            epoch,
851            table_id,
852            read_options,
853            imms,
854            uncommitted_ssts,
855            &committed,
856            &mut local_stats,
857            &mut factory,
858        )
859        .await?;
860        let merge_iter = factory.build(memtable_iter);
861        // the epoch_range left bound for iterator read
862        let mut user_iter = UserIterator::new(
863            merge_iter,
864            user_key_range,
865            epoch,
866            min_epoch,
867            Some(committed),
868        );
869        user_iter.rewind().await?;
870        Ok(HummockStorageIteratorInner::new(
871            user_iter,
872            self.state_store_metrics.clone(),
873            table_id,
874            local_stats,
875        ))
876    }
877
878    pub async fn rev_iter<'b>(
879        &self,
880        table_key_range: TableKeyRange,
881        epoch: u64,
882        table_id: TableId,
883        read_options: ReadOptions,
884        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
885        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
886    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
887        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
888        let user_key_range = (
889            user_key_range_ref.0.map(|key| key.cloned()),
890            user_key_range_ref.1.map(|key| key.cloned()),
891        );
892        let mut factory = BackwardIteratorFactory::default();
893        let mut local_stats = StoreLocalStatistic::default();
894        let (imms, uncommitted_ssts, committed) = read_version_tuple;
895        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
896        self.iter_inner(
897            table_key_range,
898            epoch,
899            table_id,
900            read_options,
901            imms,
902            uncommitted_ssts,
903            &committed,
904            &mut local_stats,
905            &mut factory,
906        )
907        .await?;
908        let merge_iter = factory.build(memtable_iter);
909        // the epoch_range left bound for iterator read
910        let mut user_iter = BackwardUserIterator::new(
911            merge_iter,
912            user_key_range,
913            epoch,
914            min_epoch,
915            Some(committed),
916        );
917        user_iter.rewind().await?;
918        Ok(HummockStorageRevIteratorInner::new(
919            user_iter,
920            self.state_store_metrics.clone(),
921            table_id,
922            local_stats,
923        ))
924    }
925
926    async fn iter_inner<F: IteratorFactory>(
927        &self,
928        table_key_range: TableKeyRange,
929        epoch: u64,
930        table_id: TableId,
931        read_options: ReadOptions,
932        imms: Vec<ImmutableMemtable>,
933        uncommitted_ssts: Vec<SstableInfo>,
934        committed: &CommittedVersion,
935        local_stats: &mut StoreLocalStatistic,
936        factory: &mut F,
937    ) -> StorageResult<()> {
938        {
939            fn bound_inner<T>(bound: &Bound<T>) -> Option<&T> {
940                match bound {
941                    Bound::Included(bound) | Bound::Excluded(bound) => Some(bound),
942                    Bound::Unbounded => None,
943                }
944            }
945            let (left, right) = &table_key_range;
946            if let (Some(left), Some(right)) = (bound_inner(left), bound_inner(right))
947                && right < left
948            {
949                if cfg!(debug_assertions) {
950                    panic!("invalid iter key range: {table_id} {left:?} {right:?}")
951                } else {
952                    return Err(HummockError::other(format!(
953                        "invalid iter key range: {table_id} {left:?} {right:?}"
954                    ))
955                    .into());
956                }
957            }
958        }
959
960        local_stats.staging_imm_iter_count = imms.len() as u64;
961        for imm in imms {
962            factory.add_batch_iter(imm);
963        }
964
965        // 2. build iterator from committed
966        // Because SST meta records encoded key range,
967        // the filter key range needs to be encoded as well.
968        let user_key_range = bound_table_key_range(table_id, &table_key_range);
969        let user_key_range_ref = (
970            user_key_range.0.as_ref().map(UserKey::as_ref),
971            user_key_range.1.as_ref().map(UserKey::as_ref),
972        );
973        let mut staging_sst_iter_count = 0;
974        // encode once
975        let bloom_filter_prefix_hash = read_options
976            .prefix_hint
977            .as_ref()
978            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.as_raw_id()));
979        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
980        if read_options.prefetch_options.prefetch {
981            sst_read_options.must_iterated_end_user_key =
982                Some(user_key_range.1.map(|key| key.cloned()));
983            sst_read_options.max_preload_retry_times = self.preload_retry_times;
984        }
985        let sst_read_options = Arc::new(sst_read_options);
986        for sstable_info in &uncommitted_ssts {
987            let table_holder = self
988                .sstable_store
989                .sstable(sstable_info, local_stats)
990                .await?;
991
992            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref()
993                && !hit_sstable_bloom_filter(
994                    &table_holder,
995                    &user_key_range_ref,
996                    *prefix_hash,
997                    local_stats,
998                )
999            {
1000                continue;
1001            }
1002
1003            staging_sst_iter_count += 1;
1004            factory.add_staging_sst_iter(F::SstableIteratorType::create(
1005                table_holder,
1006                self.sstable_store.clone(),
1007                sst_read_options.clone(),
1008                sstable_info,
1009            ));
1010        }
1011        local_stats.staging_sst_iter_count = staging_sst_iter_count;
1012
1013        let timer = Instant::now();
1014
1015        for level in committed.levels(table_id) {
1016            if level.table_infos.is_empty() {
1017                continue;
1018            }
1019
1020            if level.level_type == LevelType::Nonoverlapping {
1021                let mut table_infos =
1022                    prune_nonoverlapping_ssts(&level.table_infos, user_key_range_ref, table_id)
1023                        .peekable();
1024
1025                if table_infos.peek().is_none() {
1026                    continue;
1027                }
1028                let sstable_infos = table_infos.cloned().collect_vec();
1029                if sstable_infos.len() > 1 {
1030                    factory.add_concat_sst_iter(
1031                        sstable_infos,
1032                        self.sstable_store.clone(),
1033                        sst_read_options.clone(),
1034                    );
1035                    local_stats.non_overlapping_iter_count += 1;
1036                } else {
1037                    let sstable = self
1038                        .sstable_store
1039                        .sstable(&sstable_infos[0], local_stats)
1040                        .await?;
1041
1042                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1043                        && !hit_sstable_bloom_filter(
1044                            &sstable,
1045                            &user_key_range_ref,
1046                            *dist_hash,
1047                            local_stats,
1048                        )
1049                    {
1050                        continue;
1051                    }
1052                    // Since there is only one sst to be included for the current non-overlapping
1053                    // level, there is no need to create a ConcatIterator on it.
1054                    // We put the SstableIterator in `overlapping_iters` just for convenience since
1055                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
1056                    // it in `non_overlapping_iter_count`.
1057                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1058                        sstable,
1059                        self.sstable_store.clone(),
1060                        sst_read_options.clone(),
1061                        &sstable_infos[0],
1062                    ));
1063                    local_stats.non_overlapping_iter_count += 1;
1064                }
1065            } else {
1066                let table_infos =
1067                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
1068                // Overlapping
1069                let fetch_meta_req = table_infos.rev().collect_vec();
1070                if fetch_meta_req.is_empty() {
1071                    continue;
1072                }
1073                for sstable_info in fetch_meta_req {
1074                    let sstable = self
1075                        .sstable_store
1076                        .sstable(sstable_info, local_stats)
1077                        .await?;
1078                    assert_eq!(sstable_info.object_id, sstable.id);
1079                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref()
1080                        && !hit_sstable_bloom_filter(
1081                            &sstable,
1082                            &user_key_range_ref,
1083                            *dist_hash,
1084                            local_stats,
1085                        )
1086                    {
1087                        continue;
1088                    }
1089                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1090                        sstable,
1091                        self.sstable_store.clone(),
1092                        sst_read_options.clone(),
1093                        sstable_info,
1094                    ));
1095                    local_stats.overlapping_iter_count += 1;
1096                }
1097            }
1098        }
1099        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1100        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1101            let table_id_string = table_id.to_string();
1102            tracing::warn!(
1103                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1104                table_id_string,
1105                epoch,
1106                fetch_meta_duration_sec,
1107                local_stats.cache_meta_block_miss
1108            );
1109            self.state_store_metrics
1110                .iter_slow_fetch_meta_cache_unhits
1111                .set(local_stats.cache_meta_block_miss as i64);
1112        }
1113        Ok(())
1114    }
1115
1116    pub async fn iter_log(
1117        &self,
1118        version: PinnedVersion,
1119        epoch_range: (u64, u64),
1120        key_range: TableKeyRange,
1121        options: ReadLogOptions,
1122    ) -> HummockResult<ChangeLogIterator> {
1123        let change_log = {
1124            let table_change_logs = version.table_change_log_read_lock();
1125            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1126                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1127            } else {
1128                Vec::new()
1129            }
1130        };
1131
1132        if let Some(max_epoch_change_log) = change_log.last() {
1133            let (_, max_epoch) = epoch_range;
1134            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1135                warn!(
1136                    max_epoch,
1137                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1138                    table_id = %options.table_id,
1139                    "max_epoch does not exist"
1140                );
1141            }
1142        }
1143        let read_options = Arc::new(SstableIteratorReadOptions {
1144            cache_policy: Default::default(),
1145            must_iterated_end_user_key: None,
1146            max_preload_retry_times: 0,
1147            prefetch_for_large_query: false,
1148        });
1149
1150        async fn make_iter(
1151            sstable_infos: impl Iterator<Item = &SstableInfo>,
1152            sstable_store: &SstableStoreRef,
1153            read_options: Arc<SstableIteratorReadOptions>,
1154            local_stat: &mut StoreLocalStatistic,
1155        ) -> HummockResult<MergeIterator<SstableIterator>> {
1156            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1157                let sstable_store = sstable_store.clone();
1158                let read_options = read_options.clone();
1159                async move {
1160                    let mut local_stat = StoreLocalStatistic::default();
1161                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1162                    Ok::<_, HummockError>((
1163                        SstableIterator::new(
1164                            table_holder,
1165                            sstable_store,
1166                            read_options,
1167                            sstable_info,
1168                        ),
1169                        local_stat,
1170                    ))
1171                }
1172            }))
1173            .await?;
1174            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1175                |(iter, stats)| {
1176                    local_stat.add(&stats);
1177                    iter
1178                },
1179            )))
1180        }
1181
1182        let mut local_stat = StoreLocalStatistic::default();
1183
1184        let new_value_iter = make_iter(
1185            change_log
1186                .iter()
1187                .flat_map(|log| log.new_value.iter())
1188                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1189            &self.sstable_store,
1190            read_options.clone(),
1191            &mut local_stat,
1192        )
1193        .await?;
1194        let old_value_iter = make_iter(
1195            change_log
1196                .iter()
1197                .flat_map(|log| log.old_value.iter())
1198                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1199            &self.sstable_store,
1200            read_options.clone(),
1201            &mut local_stat,
1202        )
1203        .await?;
1204        ChangeLogIterator::new(
1205            epoch_range,
1206            key_range,
1207            new_value_iter,
1208            old_value_iter,
1209            options.table_id,
1210            IterLocalMetricsGuard::new(
1211                self.state_store_metrics.clone(),
1212                options.table_id,
1213                local_stat,
1214            ),
1215        )
1216        .await
1217    }
1218
1219    pub async fn nearest<'a, M: MeasureDistanceBuilder, O: Send>(
1220        &'a self,
1221        version: PinnedVersion,
1222        table_id: TableId,
1223        target: VectorRef<'a>,
1224        options: VectorNearestOptions,
1225        on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1226    ) -> HummockResult<Vec<O>> {
1227        let Some(index) = version.vector_indexes.get(&table_id) else {
1228            return Ok(vec![]);
1229        };
1230        if target.dimension() != index.dimension {
1231            return Err(HummockError::other(format!(
1232                "target dimension {} not match index dimension {}",
1233                target.dimension(),
1234                index.dimension
1235            )));
1236        }
1237        match &index.inner {
1238            VectorIndexImpl::Flat(flat) => {
1239                let mut builder = NearestBuilder::<'_, O, M>::new(target, options.top_n);
1240                let mut cache_stat = VectorStoreCacheStats::default();
1241                for vector_file in &flat.vector_store_info.vector_files {
1242                    let meta = self
1243                        .sstable_store
1244                        .get_vector_file_meta(vector_file, &mut cache_stat)
1245                        .await?;
1246                    for (i, block_meta) in meta.block_metas.iter().enumerate() {
1247                        let block = self
1248                            .sstable_store
1249                            .get_vector_block(vector_file, i, block_meta, &mut cache_stat)
1250                            .await?;
1251                        builder.add(&**block, &on_nearest_item_fn);
1252                    }
1253                }
1254                cache_stat.report(table_id, "flat", self.stats());
1255                Ok(builder.finish())
1256            }
1257            VectorIndexImpl::HnswFlat(hnsw_flat) => {
1258                let Some(graph_file) = &hnsw_flat.graph_file else {
1259                    return Ok(vec![]);
1260                };
1261
1262                let mut ctx = FileVectorStoreCtx::default();
1263
1264                let graph = self
1265                    .sstable_store
1266                    .get_hnsw_graph(graph_file, &mut ctx.stats)
1267                    .await?;
1268
1269                let vector_store =
1270                    FileVectorStore::new_for_reader(hnsw_flat, self.sstable_store.clone());
1271                let (items, stats) = nearest::<O, M, _>(
1272                    &vector_store,
1273                    &mut ctx,
1274                    &*graph,
1275                    target,
1276                    on_nearest_item_fn,
1277                    options.hnsw_ef_search,
1278                    options.top_n,
1279                )
1280                .await?;
1281                ctx.stats.report(table_id, "hnsw_read", self.stats());
1282                report_hnsw_stat(
1283                    self.stats(),
1284                    table_id,
1285                    "hnsw_read",
1286                    options.top_n,
1287                    options.hnsw_ef_search,
1288                    [stats],
1289                );
1290                Ok(items)
1291            }
1292        }
1293    }
1294}