risingwave_storage/hummock/store/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::Included;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36    PkPrefixTableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
39use risingwave_pb::hummock::LevelType;
40use sync_point::sync_point;
41use tracing::warn;
42
43use crate::error::StorageResult;
44use crate::hummock::event_handler::LocalInstanceId;
45use crate::hummock::iterator::change_log::ChangeLogIterator;
46use crate::hummock::iterator::{
47    BackwardUserIterator, HummockIterator, IteratorFactory, MergeIterator, UserIterator,
48};
49use crate::hummock::local_version::pinned_version::PinnedVersion;
50use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
51use crate::hummock::sstable_store::SstableStoreRef;
52use crate::hummock::utils::{
53    filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
54    search_sst_idx,
55};
56use crate::hummock::{
57    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
58    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
59    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
60    hit_sstable_bloom_filter,
61};
62use crate::mem_table::{
63    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
64};
65use crate::monitor::{
66    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
67};
68use crate::store::{ReadLogOptions, ReadOptions, gen_min_epoch};
69
70pub type CommittedVersion = PinnedVersion;
71
72/// Data not committed to Hummock. There are two types of staging data:
73/// - Immutable memtable: data that has been written into local state store but not persisted.
74/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
75///   hummock version.
76
77#[derive(Clone, Debug, PartialEq)]
78pub struct StagingSstableInfo {
79    // newer data comes first
80    sstable_infos: Vec<LocalSstableInfo>,
81    old_value_sstable_infos: Vec<LocalSstableInfo>,
82    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
83    /// The field must not be empty.
84    epochs: Vec<HummockEpoch>,
85    // newer data at the front
86    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
87    imm_size: usize,
88}
89
90impl StagingSstableInfo {
91    pub fn new(
92        sstable_infos: Vec<LocalSstableInfo>,
93        old_value_sstable_infos: Vec<LocalSstableInfo>,
94        epochs: Vec<HummockEpoch>,
95        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96        imm_size: usize,
97    ) -> Self {
98        // the epochs are sorted from higher epoch to lower epoch
99        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
100        Self {
101            sstable_infos,
102            old_value_sstable_infos,
103            epochs,
104            imm_ids,
105            imm_size,
106        }
107    }
108
109    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
110        &self.sstable_infos
111    }
112
113    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
114        &self.old_value_sstable_infos
115    }
116
117    pub fn imm_size(&self) -> usize {
118        self.imm_size
119    }
120
121    pub fn epochs(&self) -> &Vec<HummockEpoch> {
122        &self.epochs
123    }
124
125    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
126        &self.imm_ids
127    }
128}
129
130#[derive(Clone)]
131pub enum StagingData {
132    ImmMem(ImmutableMemtable),
133    Sst(Arc<StagingSstableInfo>),
134}
135
136pub enum VersionUpdate {
137    /// a new staging data entry will be added.
138    Staging(StagingData),
139    CommittedSnapshot(CommittedVersion),
140    NewTableWatermark {
141        direction: WatermarkDirection,
142        epoch: HummockEpoch,
143        vnode_watermarks: Vec<VnodeWatermark>,
144        watermark_type: WatermarkSerdeType,
145    },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150    // newer data comes first
151    // Note: Currently, building imm and writing to staging version is not atomic, and therefore
152    // imm of smaller batch id may be added later than one with greater batch id
153    pub imm: VecDeque<ImmutableMemtable>,
154
155    // newer data comes first
156    pub sst: VecDeque<Arc<StagingSstableInfo>>,
157}
158
159impl StagingVersion {
160    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
161    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
162    pub fn prune_overlap<'a>(
163        &'a self,
164        max_epoch_inclusive: HummockEpoch,
165        table_id: TableId,
166        table_key_range: &'a TableKeyRange,
167    ) -> (
168        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
169        impl Iterator<Item = &'a SstableInfo> + 'a,
170    ) {
171        let (left, right) = table_key_range;
172        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
173        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
174        let overlapped_imms = self.imm.iter().filter(move |imm| {
175            // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
176            imm.min_epoch() <= max_epoch_inclusive
177                && imm.table_id == table_id
178                && range_overlap(
179                    &(left, right),
180                    &imm.start_table_key(),
181                    Included(&imm.end_table_key()),
182                )
183        });
184
185        // TODO: Remove duplicate sst based on sst id
186        let overlapped_ssts = self
187            .sst
188            .iter()
189            .filter(move |staging_sst| {
190                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
191                sst_max_epoch <= max_epoch_inclusive
192            })
193            .flat_map(move |staging_sst| {
194                // TODO: sstable info should be concat-able after each streaming table owns a read
195                // version. May use concat sstable iter instead in some cases.
196                staging_sst
197                    .sstable_infos
198                    .iter()
199                    .map(|sstable| &sstable.sst_info)
200                    .filter(move |sstable: &&SstableInfo| {
201                        filter_single_sst(sstable, table_id, table_key_range)
202                    })
203            });
204        (overlapped_imms, overlapped_ssts)
205    }
206
207    pub fn is_empty(&self) -> bool {
208        self.imm.is_empty() && self.sst.is_empty()
209    }
210}
211
212#[derive(Clone)]
213/// A container of information required for reading from hummock.
214pub struct HummockReadVersion {
215    table_id: TableId,
216    instance_id: LocalInstanceId,
217
218    /// Local version for staging data.
219    staging: StagingVersion,
220
221    /// Remote version for committed data.
222    committed: CommittedVersion,
223
224    /// Indicate if this is replicated. If it is, we should ignore it during
225    /// global state store read, to avoid duplicated results.
226    /// Otherwise for local state store, it is fine, see we will see the
227    /// `ReadVersion` just for that local state store.
228    is_replicated: bool,
229
230    table_watermarks: Option<PkPrefixTableWatermarksIndex>,
231
232    // Vnode bitmap corresponding to the read version
233    // It will be initialized after local state store init
234    vnodes: Arc<Bitmap>,
235}
236
237impl HummockReadVersion {
238    pub fn new_with_replication_option(
239        table_id: TableId,
240        instance_id: LocalInstanceId,
241        committed_version: CommittedVersion,
242        is_replicated: bool,
243        vnodes: Arc<Bitmap>,
244    ) -> Self {
245        // before build `HummockReadVersion`, we need to get the a initial version which obtained
246        // from meta. want this initialization after version is initialized (now with
247        // notification), so add a assert condition to guarantee correct initialization order
248        assert!(committed_version.is_valid());
249        Self {
250            table_id,
251            instance_id,
252            table_watermarks: {
253                match committed_version.table_watermarks.get(&table_id) {
254                    Some(table_watermarks) => match table_watermarks.watermark_type {
255                        WatermarkSerdeType::PkPrefix => {
256                            Some(PkPrefixTableWatermarksIndex::new_committed(
257                                table_watermarks.clone(),
258                                committed_version
259                                    .state_table_info
260                                    .info()
261                                    .get(&table_id)
262                                    .expect("should exist")
263                                    .committed_epoch,
264                            ))
265                        }
266
267                        WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */
268                    },
269                    None => None,
270                }
271            },
272            staging: StagingVersion {
273                imm: VecDeque::default(),
274                sst: VecDeque::default(),
275            },
276
277            committed: committed_version,
278
279            is_replicated,
280            vnodes,
281        }
282    }
283
284    pub fn new(
285        table_id: TableId,
286        instance_id: LocalInstanceId,
287        committed_version: CommittedVersion,
288        vnodes: Arc<Bitmap>,
289    ) -> Self {
290        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
291    }
292
293    pub fn table_id(&self) -> TableId {
294        self.table_id
295    }
296
297    /// Updates the read version with `VersionUpdate`.
298    /// There will be three data types to be processed
299    /// `VersionUpdate::Staging`
300    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
301    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
302    ///       corresponding `staging_imms` according to the `batch_id`
303    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
304    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
305    /// `staging_sst` and `staging_imm` in memory according to epoch
306    pub fn update(&mut self, info: VersionUpdate) {
307        match info {
308            VersionUpdate::Staging(staging) => match staging {
309                // TODO: add a check to ensure that the added batch id of added imm is greater than
310                // the batch id of imm at the front
311                StagingData::ImmMem(imm) => {
312                    if let Some(item) = self.staging.imm.front() {
313                        // check batch_id order from newest to old
314                        debug_assert!(item.batch_id() < imm.batch_id());
315                    }
316
317                    self.staging.imm.push_front(imm)
318                }
319                StagingData::Sst(staging_sst_ref) => {
320                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
321                        warn!(
322                            instance_id = self.instance_id,
323                            "no related imm in sst input"
324                        );
325                        return;
326                    };
327
328                    // old data comes first
329                    for imm_id in imms.iter().rev() {
330                        let check_err = match self.staging.imm.pop_back() {
331                            None => Some("empty".to_owned()),
332                            Some(prev_imm_id) => {
333                                if prev_imm_id.batch_id() == *imm_id {
334                                    None
335                                } else {
336                                    Some(format!(
337                                        "miss match id {} {}",
338                                        prev_imm_id.batch_id(),
339                                        *imm_id
340                                    ))
341                                }
342                            }
343                        };
344                        assert!(
345                            check_err.is_none(),
346                            "should be valid staging_sst.size {},
347                                    staging_sst.imm_ids {:?},
348                                    staging_sst.epochs {:?},
349                                    local_imm_ids {:?},
350                                    instance_id {}
351                                    check_err {:?}",
352                            staging_sst_ref.imm_size,
353                            staging_sst_ref.imm_ids,
354                            staging_sst_ref.epochs,
355                            self.staging
356                                .imm
357                                .iter()
358                                .map(|imm| imm.batch_id())
359                                .collect_vec(),
360                            self.instance_id,
361                            check_err
362                        );
363                    }
364
365                    self.staging.sst.push_front(staging_sst_ref);
366                }
367            },
368
369            VersionUpdate::CommittedSnapshot(committed_version) => {
370                if let Some(info) = committed_version
371                    .state_table_info
372                    .info()
373                    .get(&self.table_id)
374                {
375                    let committed_epoch = info.committed_epoch;
376                    self.staging.imm.retain(|imm| {
377                        if self.is_replicated {
378                            imm.min_epoch() > committed_epoch
379                        } else {
380                            assert!(imm.min_epoch() > committed_epoch);
381                            true
382                        }
383                    });
384
385                    self.staging.sst.retain(|sst| {
386                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
387                    });
388
389                    // check epochs.last() > MCE
390                    assert!(self.staging.sst.iter().all(|sst| {
391                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
392                    }));
393
394                    if let Some(committed_watermarks) =
395                        committed_version.table_watermarks.get(&self.table_id)
396                        && let WatermarkSerdeType::PkPrefix = committed_watermarks.watermark_type
397                    {
398                        if let Some(watermark_index) = &mut self.table_watermarks {
399                            watermark_index.apply_committed_watermarks(
400                                committed_watermarks.clone(),
401                                committed_epoch,
402                            );
403                        } else {
404                            self.table_watermarks =
405                                Some(PkPrefixTableWatermarksIndex::new_committed(
406                                    committed_watermarks.clone(),
407                                    committed_epoch,
408                                ));
409                        }
410                    }
411                }
412
413                self.committed = committed_version;
414            }
415            VersionUpdate::NewTableWatermark {
416                direction,
417                epoch,
418                vnode_watermarks,
419                watermark_type,
420            } => {
421                assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
422                if let Some(watermark_index) = &mut self.table_watermarks {
423                    watermark_index.add_epoch_watermark(
424                        epoch,
425                        Arc::from(vnode_watermarks),
426                        direction,
427                    );
428                } else {
429                    self.table_watermarks = Some(PkPrefixTableWatermarksIndex::new(
430                        direction,
431                        epoch,
432                        vnode_watermarks,
433                        self.committed.table_committed_epoch(self.table_id),
434                    ));
435                }
436            }
437        }
438    }
439
440    pub fn staging(&self) -> &StagingVersion {
441        &self.staging
442    }
443
444    pub fn committed(&self) -> &CommittedVersion {
445        &self.committed
446    }
447
448    /// We have assumption that the watermark is increasing monotonically. Therefore,
449    /// here if the upper layer usage has passed an regressed watermark, we should
450    /// filter out the regressed watermark. Currently the kv log store may write
451    /// regressed watermark
452    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
453        if let Some(watermark_index) = &self.table_watermarks {
454            watermark_index.filter_regress_watermarks(watermarks)
455        }
456    }
457
458    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
459        self.table_watermarks
460            .as_ref()
461            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
462    }
463
464    pub fn is_replicated(&self) -> bool {
465        self.is_replicated
466    }
467
468    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
469        std::mem::replace(&mut self.vnodes, vnodes)
470    }
471
472    pub fn contains(&self, vnode: VirtualNode) -> bool {
473        self.vnodes.is_set(vnode.to_index())
474    }
475
476    pub fn vnodes(&self) -> Arc<Bitmap> {
477        self.vnodes.clone()
478    }
479}
480
481pub fn read_filter_for_version(
482    epoch: HummockEpoch,
483    table_id: TableId,
484    mut table_key_range: TableKeyRange,
485    read_version: &RwLock<HummockReadVersion>,
486) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
487    let read_version_guard = read_version.read();
488
489    let committed_version = read_version_guard.committed().clone();
490
491    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
492        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
493    }
494
495    let (imm_iter, sst_iter) =
496        read_version_guard
497            .staging()
498            .prune_overlap(epoch, table_id, &table_key_range);
499
500    let imms = imm_iter.cloned().collect();
501    let ssts = sst_iter.cloned().collect();
502
503    Ok((table_key_range, (imms, ssts, committed_version)))
504}
505
506#[derive(Clone)]
507pub struct HummockVersionReader {
508    sstable_store: SstableStoreRef,
509
510    /// Statistics
511    state_store_metrics: Arc<HummockStateStoreMetrics>,
512    preload_retry_times: usize,
513}
514
515/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
516/// `streaming_query`
517impl HummockVersionReader {
518    pub fn new(
519        sstable_store: SstableStoreRef,
520        state_store_metrics: Arc<HummockStateStoreMetrics>,
521        preload_retry_times: usize,
522    ) -> Self {
523        Self {
524            sstable_store,
525            state_store_metrics,
526            preload_retry_times,
527        }
528    }
529
530    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
531        &self.state_store_metrics
532    }
533}
534
535const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
536
537impl HummockVersionReader {
538    pub async fn get<O>(
539        &self,
540        table_key: TableKey<Bytes>,
541        epoch: u64,
542        table_id: TableId,
543        read_options: ReadOptions,
544        read_version_tuple: ReadVersionTuple,
545        on_key_value_fn: impl crate::store::KeyValueFn<O>,
546    ) -> StorageResult<Option<O>> {
547        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
548
549        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
550        let mut stats_guard = GetLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id);
551        let local_stats = &mut stats_guard.local_stats;
552        local_stats.found_key = true;
553
554        // 1. read staging data
555        for imm in &imms {
556            // skip imm that only holding out-of-date data
557            if imm.max_epoch() < min_epoch {
558                continue;
559            }
560
561            local_stats.staging_imm_get_count += 1;
562
563            if let Some((data, data_epoch)) = get_from_batch(
564                imm,
565                TableKey(table_key.as_ref()),
566                epoch,
567                &read_options,
568                local_stats,
569            ) {
570                return Ok(if data_epoch.pure_epoch() < min_epoch {
571                    None
572                } else {
573                    data.into_user_value()
574                        .map(|v| {
575                            on_key_value_fn(
576                                FullKey::new_with_gap_epoch(
577                                    table_id,
578                                    table_key.to_ref(),
579                                    data_epoch,
580                                ),
581                                v.as_ref(),
582                            )
583                        })
584                        .transpose()?
585                });
586            }
587        }
588
589        // 2. order guarantee: imm -> sst
590        let dist_key_hash = read_options
591            .prefix_hint
592            .as_ref()
593            .map(|dist_key| Sstable::hash_for_bloom_filter(dist_key.as_ref(), table_id.table_id()));
594
595        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
596        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
597        let full_key = FullKey::new_with_gap_epoch(
598            table_id,
599            TableKey(table_key.clone()),
600            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
601        );
602        for local_sst in &uncommitted_ssts {
603            local_stats.staging_sst_get_count += 1;
604            if let Some(iter) = get_from_sstable_info(
605                self.sstable_store.clone(),
606                local_sst,
607                full_key.to_ref(),
608                &read_options,
609                dist_key_hash,
610                local_stats,
611            )
612            .await?
613            {
614                debug_assert!(iter.is_valid());
615                let data_epoch = iter.key().epoch_with_gap;
616                return Ok(if data_epoch.pure_epoch() < min_epoch {
617                    None
618                } else {
619                    iter.value()
620                        .into_user_value()
621                        .map(|v| {
622                            on_key_value_fn(
623                                FullKey::new_with_gap_epoch(
624                                    table_id,
625                                    table_key.to_ref(),
626                                    data_epoch,
627                                ),
628                                v,
629                            )
630                        })
631                        .transpose()?
632                });
633            }
634        }
635        let single_table_key_range = table_key.clone()..=table_key.clone();
636        // 3. read from committed_version sst file
637        // Because SST meta records encoded key range,
638        // the filter key needs to be encoded as well.
639        assert!(committed_version.is_valid());
640        for level in committed_version.levels(table_id) {
641            if level.table_infos.is_empty() {
642                continue;
643            }
644
645            match level.level_type {
646                LevelType::Overlapping | LevelType::Unspecified => {
647                    let sstable_infos = prune_overlapping_ssts(
648                        &level.table_infos,
649                        table_id,
650                        &single_table_key_range,
651                    );
652                    for sstable_info in sstable_infos {
653                        local_stats.overlapping_get_count += 1;
654                        if let Some(iter) = get_from_sstable_info(
655                            self.sstable_store.clone(),
656                            sstable_info,
657                            full_key.to_ref(),
658                            &read_options,
659                            dist_key_hash,
660                            local_stats,
661                        )
662                        .await?
663                        {
664                            debug_assert!(iter.is_valid());
665                            let data_epoch = iter.key().epoch_with_gap;
666                            return Ok(if data_epoch.pure_epoch() < min_epoch {
667                                None
668                            } else {
669                                iter.value()
670                                    .into_user_value()
671                                    .map(|v| {
672                                        on_key_value_fn(
673                                            FullKey::new_with_gap_epoch(
674                                                table_id,
675                                                table_key.to_ref(),
676                                                data_epoch,
677                                            ),
678                                            v,
679                                        )
680                                    })
681                                    .transpose()?
682                            });
683                        }
684                    }
685                }
686                LevelType::Nonoverlapping => {
687                    let mut table_info_idx =
688                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
689                    if table_info_idx == 0 {
690                        continue;
691                    }
692                    table_info_idx = table_info_idx.saturating_sub(1);
693                    let ord = level.table_infos[table_info_idx]
694                        .key_range
695                        .compare_right_with_user_key(full_key.user_key.as_ref());
696                    // the case that the key falls into the gap between two ssts
697                    if ord == Ordering::Less {
698                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
699                        continue;
700                    }
701
702                    local_stats.non_overlapping_get_count += 1;
703                    if let Some(iter) = get_from_sstable_info(
704                        self.sstable_store.clone(),
705                        &level.table_infos[table_info_idx],
706                        full_key.to_ref(),
707                        &read_options,
708                        dist_key_hash,
709                        local_stats,
710                    )
711                    .await?
712                    {
713                        debug_assert!(iter.is_valid());
714                        let data_epoch = iter.key().epoch_with_gap;
715                        return Ok(if data_epoch.pure_epoch() < min_epoch {
716                            None
717                        } else {
718                            iter.value()
719                                .into_user_value()
720                                .map(|v| {
721                                    on_key_value_fn(
722                                        FullKey::new_with_gap_epoch(
723                                            table_id,
724                                            table_key.to_ref(),
725                                            data_epoch,
726                                        ),
727                                        v,
728                                    )
729                                })
730                                .transpose()?
731                        });
732                    }
733                }
734            }
735        }
736        stats_guard.local_stats.found_key = false;
737        Ok(None)
738    }
739
740    pub async fn iter(
741        &self,
742        table_key_range: TableKeyRange,
743        epoch: u64,
744        table_id: TableId,
745        read_options: ReadOptions,
746        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
747    ) -> StorageResult<HummockStorageIterator> {
748        self.iter_with_memtable(
749            table_key_range,
750            epoch,
751            table_id,
752            read_options,
753            read_version_tuple,
754            None,
755        )
756        .await
757    }
758
759    pub async fn iter_with_memtable<'b>(
760        &self,
761        table_key_range: TableKeyRange,
762        epoch: u64,
763        table_id: TableId,
764        read_options: ReadOptions,
765        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
766        memtable_iter: Option<MemTableHummockIterator<'b>>,
767    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
768        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
769        let user_key_range = (
770            user_key_range_ref.0.map(|key| key.cloned()),
771            user_key_range_ref.1.map(|key| key.cloned()),
772        );
773        let mut factory = ForwardIteratorFactory::default();
774        let mut local_stats = StoreLocalStatistic::default();
775        let (imms, uncommitted_ssts, committed) = read_version_tuple;
776        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
777        self.iter_inner(
778            table_key_range,
779            epoch,
780            table_id,
781            read_options,
782            imms,
783            uncommitted_ssts,
784            &committed,
785            &mut local_stats,
786            &mut factory,
787        )
788        .await?;
789        let merge_iter = factory.build(memtable_iter);
790        // the epoch_range left bound for iterator read
791        let mut user_iter = UserIterator::new(
792            merge_iter,
793            user_key_range,
794            epoch,
795            min_epoch,
796            Some(committed),
797        );
798        user_iter.rewind().await?;
799        Ok(HummockStorageIteratorInner::new(
800            user_iter,
801            self.state_store_metrics.clone(),
802            table_id,
803            local_stats,
804        ))
805    }
806
807    pub async fn rev_iter<'b>(
808        &self,
809        table_key_range: TableKeyRange,
810        epoch: u64,
811        table_id: TableId,
812        read_options: ReadOptions,
813        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
814        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
815    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
816        let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
817        let user_key_range = (
818            user_key_range_ref.0.map(|key| key.cloned()),
819            user_key_range_ref.1.map(|key| key.cloned()),
820        );
821        let mut factory = BackwardIteratorFactory::default();
822        let mut local_stats = StoreLocalStatistic::default();
823        let (imms, uncommitted_ssts, committed) = read_version_tuple;
824        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
825        self.iter_inner(
826            table_key_range,
827            epoch,
828            table_id,
829            read_options,
830            imms,
831            uncommitted_ssts,
832            &committed,
833            &mut local_stats,
834            &mut factory,
835        )
836        .await?;
837        let merge_iter = factory.build(memtable_iter);
838        // the epoch_range left bound for iterator read
839        let mut user_iter = BackwardUserIterator::new(
840            merge_iter,
841            user_key_range,
842            epoch,
843            min_epoch,
844            Some(committed),
845        );
846        user_iter.rewind().await?;
847        Ok(HummockStorageRevIteratorInner::new(
848            user_iter,
849            self.state_store_metrics.clone(),
850            table_id,
851            local_stats,
852        ))
853    }
854
855    async fn iter_inner<F: IteratorFactory>(
856        &self,
857        table_key_range: TableKeyRange,
858        epoch: u64,
859        table_id: TableId,
860        read_options: ReadOptions,
861        imms: Vec<ImmutableMemtable>,
862        uncommitted_ssts: Vec<SstableInfo>,
863        committed: &CommittedVersion,
864        local_stats: &mut StoreLocalStatistic,
865        factory: &mut F,
866    ) -> StorageResult<()> {
867        local_stats.staging_imm_iter_count = imms.len() as u64;
868        for imm in imms {
869            factory.add_batch_iter(imm);
870        }
871
872        // 2. build iterator from committed
873        // Because SST meta records encoded key range,
874        // the filter key range needs to be encoded as well.
875        let user_key_range = bound_table_key_range(table_id, &table_key_range);
876        let user_key_range_ref = (
877            user_key_range.0.as_ref().map(UserKey::as_ref),
878            user_key_range.1.as_ref().map(UserKey::as_ref),
879        );
880        let mut staging_sst_iter_count = 0;
881        // encode once
882        let bloom_filter_prefix_hash = read_options
883            .prefix_hint
884            .as_ref()
885            .map(|hint| Sstable::hash_for_bloom_filter(hint, table_id.table_id()));
886        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
887        if read_options.prefetch_options.prefetch {
888            sst_read_options.must_iterated_end_user_key =
889                Some(user_key_range.1.map(|key| key.cloned()));
890            sst_read_options.max_preload_retry_times = self.preload_retry_times;
891        }
892        let sst_read_options = Arc::new(sst_read_options);
893        for sstable_info in &uncommitted_ssts {
894            let table_holder = self
895                .sstable_store
896                .sstable(sstable_info, local_stats)
897                .await?;
898
899            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
900                if !hit_sstable_bloom_filter(
901                    &table_holder,
902                    &user_key_range_ref,
903                    *prefix_hash,
904                    local_stats,
905                ) {
906                    continue;
907                }
908            }
909
910            staging_sst_iter_count += 1;
911            factory.add_staging_sst_iter(F::SstableIteratorType::create(
912                table_holder,
913                self.sstable_store.clone(),
914                sst_read_options.clone(),
915                sstable_info,
916            ));
917        }
918        local_stats.staging_sst_iter_count = staging_sst_iter_count;
919
920        let timer = Instant::now();
921
922        for level in committed.levels(table_id) {
923            if level.table_infos.is_empty() {
924                continue;
925            }
926
927            if level.level_type == LevelType::Nonoverlapping {
928                let mut table_infos = prune_nonoverlapping_ssts(
929                    &level.table_infos,
930                    user_key_range_ref,
931                    table_id.table_id(),
932                )
933                .peekable();
934
935                if table_infos.peek().is_none() {
936                    continue;
937                }
938                let sstable_infos = table_infos.cloned().collect_vec();
939                if sstable_infos.len() > 1 {
940                    factory.add_concat_sst_iter(
941                        sstable_infos,
942                        self.sstable_store.clone(),
943                        sst_read_options.clone(),
944                    );
945                    local_stats.non_overlapping_iter_count += 1;
946                } else {
947                    let sstable = self
948                        .sstable_store
949                        .sstable(&sstable_infos[0], local_stats)
950                        .await?;
951
952                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
953                        if !hit_sstable_bloom_filter(
954                            &sstable,
955                            &user_key_range_ref,
956                            *dist_hash,
957                            local_stats,
958                        ) {
959                            continue;
960                        }
961                    }
962                    // Since there is only one sst to be included for the current non-overlapping
963                    // level, there is no need to create a ConcatIterator on it.
964                    // We put the SstableIterator in `overlapping_iters` just for convenience since
965                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
966                    // it in `non_overlapping_iter_count`.
967                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
968                        sstable,
969                        self.sstable_store.clone(),
970                        sst_read_options.clone(),
971                        &sstable_infos[0],
972                    ));
973                    local_stats.non_overlapping_iter_count += 1;
974                }
975            } else {
976                let table_infos =
977                    prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range);
978                // Overlapping
979                let fetch_meta_req = table_infos.rev().collect_vec();
980                if fetch_meta_req.is_empty() {
981                    continue;
982                }
983                for sstable_info in fetch_meta_req {
984                    let sstable = self
985                        .sstable_store
986                        .sstable(sstable_info, local_stats)
987                        .await?;
988                    assert_eq!(sstable_info.object_id, sstable.id);
989                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
990                        if !hit_sstable_bloom_filter(
991                            &sstable,
992                            &user_key_range_ref,
993                            *dist_hash,
994                            local_stats,
995                        ) {
996                            continue;
997                        }
998                    }
999                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
1000                        sstable,
1001                        self.sstable_store.clone(),
1002                        sst_read_options.clone(),
1003                        sstable_info,
1004                    ));
1005                    local_stats.overlapping_iter_count += 1;
1006                }
1007            }
1008        }
1009        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
1010        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
1011            let table_id_string = table_id.to_string();
1012            tracing::warn!(
1013                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
1014                table_id_string,
1015                epoch,
1016                fetch_meta_duration_sec,
1017                local_stats.cache_meta_block_miss
1018            );
1019            self.state_store_metrics
1020                .iter_slow_fetch_meta_cache_unhits
1021                .set(local_stats.cache_meta_block_miss as i64);
1022        }
1023        Ok(())
1024    }
1025
1026    pub async fn iter_log(
1027        &self,
1028        version: PinnedVersion,
1029        epoch_range: (u64, u64),
1030        key_range: TableKeyRange,
1031        options: ReadLogOptions,
1032    ) -> HummockResult<ChangeLogIterator> {
1033        let change_log = {
1034            let table_change_logs = version.table_change_log_read_lock();
1035            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1036                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1037            } else {
1038                Vec::new()
1039            }
1040        };
1041
1042        if let Some(max_epoch_change_log) = change_log.last() {
1043            let (_, max_epoch) = epoch_range;
1044            if !max_epoch_change_log.epochs().contains(&max_epoch) {
1045                warn!(
1046                    max_epoch,
1047                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs()).collect_vec(),
1048                    table_id = options.table_id.table_id,
1049                    "max_epoch does not exist"
1050                );
1051            }
1052        }
1053        let read_options = Arc::new(SstableIteratorReadOptions {
1054            cache_policy: Default::default(),
1055            must_iterated_end_user_key: None,
1056            max_preload_retry_times: 0,
1057            prefetch_for_large_query: false,
1058        });
1059
1060        async fn make_iter(
1061            sstable_infos: impl Iterator<Item = &SstableInfo>,
1062            sstable_store: &SstableStoreRef,
1063            read_options: Arc<SstableIteratorReadOptions>,
1064            local_stat: &mut StoreLocalStatistic,
1065        ) -> HummockResult<MergeIterator<SstableIterator>> {
1066            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1067                let sstable_store = sstable_store.clone();
1068                let read_options = read_options.clone();
1069                async move {
1070                    let mut local_stat = StoreLocalStatistic::default();
1071                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1072                    Ok::<_, HummockError>((
1073                        SstableIterator::new(
1074                            table_holder,
1075                            sstable_store,
1076                            read_options,
1077                            sstable_info,
1078                        ),
1079                        local_stat,
1080                    ))
1081                }
1082            }))
1083            .await?;
1084            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1085                |(iter, stats)| {
1086                    local_stat.add(&stats);
1087                    iter
1088                },
1089            )))
1090        }
1091
1092        let mut local_stat = StoreLocalStatistic::default();
1093
1094        let new_value_iter = make_iter(
1095            change_log
1096                .iter()
1097                .flat_map(|log| log.new_value.iter())
1098                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1099            &self.sstable_store,
1100            read_options.clone(),
1101            &mut local_stat,
1102        )
1103        .await?;
1104        let old_value_iter = make_iter(
1105            change_log
1106                .iter()
1107                .flat_map(|log| log.old_value.iter())
1108                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1109            &self.sstable_store,
1110            read_options.clone(),
1111            &mut local_stat,
1112        )
1113        .await?;
1114        ChangeLogIterator::new(
1115            epoch_range,
1116            key_range,
1117            new_value_iter,
1118            old_value_iter,
1119            options.table_id,
1120            IterLocalMetricsGuard::new(
1121                self.state_store_metrics.clone(),
1122                options.table_id,
1123                local_stat,
1124            ),
1125        )
1126        .await
1127    }
1128}