risingwave_storage/hummock/store/
version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::vec_deque::VecDeque;
18use std::ops::Bound::Included;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use futures::future::try_join_all;
24use itertools::Itertools;
25use parking_lot::RwLock;
26use risingwave_common::bitmap::Bitmap;
27use risingwave_common::catalog::TableId;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::MAX_SPILL_TIMES;
30use risingwave_hummock_sdk::key::{
31    FullKey, TableKey, TableKeyRange, UserKey, bound_table_key_range,
32};
33use risingwave_hummock_sdk::key_range::KeyRangeCommon;
34use risingwave_hummock_sdk::sstable_info::SstableInfo;
35use risingwave_hummock_sdk::table_watermark::{
36    TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
37};
38use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo};
39use risingwave_pb::hummock::LevelType;
40use sync_point::sync_point;
41use tracing::warn;
42
43use crate::error::StorageResult;
44use crate::hummock::event_handler::LocalInstanceId;
45use crate::hummock::iterator::change_log::ChangeLogIterator;
46use crate::hummock::iterator::{
47    BackwardUserIterator, IteratorFactory, MergeIterator, UserIterator,
48};
49use crate::hummock::local_version::pinned_version::PinnedVersion;
50use crate::hummock::sstable::{SstableIteratorReadOptions, SstableIteratorType};
51use crate::hummock::sstable_store::SstableStoreRef;
52use crate::hummock::utils::{
53    filter_single_sst, prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap,
54    search_sst_idx,
55};
56use crate::hummock::{
57    BackwardIteratorFactory, ForwardIteratorFactory, HummockError, HummockResult,
58    HummockStorageIterator, HummockStorageIteratorInner, HummockStorageRevIteratorInner,
59    ReadVersionTuple, Sstable, SstableIterator, get_from_batch, get_from_sstable_info,
60    hit_sstable_bloom_filter,
61};
62use crate::mem_table::{
63    ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
64};
65use crate::monitor::{
66    GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
67};
68use crate::store::{ReadLogOptions, ReadOptions, StateStoreKeyedRow, gen_min_epoch};
69
70pub type CommittedVersion = PinnedVersion;
71
72/// Data not committed to Hummock. There are two types of staging data:
73/// - Immutable memtable: data that has been written into local state store but not persisted.
74/// - Uncommitted SST: data that has been uploaded to persistent storage but not committed to
75///   hummock version.
76
77#[derive(Clone, Debug, PartialEq)]
78pub struct StagingSstableInfo {
79    // newer data comes first
80    sstable_infos: Vec<LocalSstableInfo>,
81    old_value_sstable_infos: Vec<LocalSstableInfo>,
82    /// Epochs whose data are included in the Sstable. The newer epoch comes first.
83    /// The field must not be empty.
84    epochs: Vec<HummockEpoch>,
85    // newer data at the front
86    imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
87    imm_size: usize,
88}
89
90impl StagingSstableInfo {
91    pub fn new(
92        sstable_infos: Vec<LocalSstableInfo>,
93        old_value_sstable_infos: Vec<LocalSstableInfo>,
94        epochs: Vec<HummockEpoch>,
95        imm_ids: HashMap<LocalInstanceId, Vec<ImmId>>,
96        imm_size: usize,
97    ) -> Self {
98        // the epochs are sorted from higher epoch to lower epoch
99        assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2 <= epoch1));
100        Self {
101            sstable_infos,
102            old_value_sstable_infos,
103            epochs,
104            imm_ids,
105            imm_size,
106        }
107    }
108
109    pub fn sstable_infos(&self) -> &Vec<LocalSstableInfo> {
110        &self.sstable_infos
111    }
112
113    pub fn old_value_sstable_infos(&self) -> &Vec<LocalSstableInfo> {
114        &self.old_value_sstable_infos
115    }
116
117    pub fn imm_size(&self) -> usize {
118        self.imm_size
119    }
120
121    pub fn epochs(&self) -> &Vec<HummockEpoch> {
122        &self.epochs
123    }
124
125    pub fn imm_ids(&self) -> &HashMap<LocalInstanceId, Vec<ImmId>> {
126        &self.imm_ids
127    }
128}
129
130#[derive(Clone)]
131pub enum StagingData {
132    ImmMem(ImmutableMemtable),
133    Sst(Arc<StagingSstableInfo>),
134}
135
136pub enum VersionUpdate {
137    /// a new staging data entry will be added.
138    Staging(StagingData),
139    CommittedSnapshot(CommittedVersion),
140    NewTableWatermark {
141        direction: WatermarkDirection,
142        epoch: HummockEpoch,
143        vnode_watermarks: Vec<VnodeWatermark>,
144        watermark_type: WatermarkSerdeType,
145    },
146}
147
148#[derive(Clone)]
149pub struct StagingVersion {
150    // newer data comes first
151    // Note: Currently, building imm and writing to staging version is not atomic, and therefore
152    // imm of smaller batch id may be added later than one with greater batch id
153    pub imm: VecDeque<ImmutableMemtable>,
154
155    // newer data comes first
156    pub sst: VecDeque<Arc<StagingSstableInfo>>,
157}
158
159impl StagingVersion {
160    /// Get the overlapping `imm`s and `sst`s that overlap respectively with `table_key_range` and
161    /// the user key range derived from `table_id`, `epoch` and `table_key_range`.
162    pub fn prune_overlap<'a>(
163        &'a self,
164        max_epoch_inclusive: HummockEpoch,
165        table_id: TableId,
166        table_key_range: &'a TableKeyRange,
167    ) -> (
168        impl Iterator<Item = &'a ImmutableMemtable> + 'a,
169        impl Iterator<Item = &'a SstableInfo> + 'a,
170    ) {
171        let (left, right) = table_key_range;
172        let left = left.as_ref().map(|key| TableKey(key.0.as_ref()));
173        let right = right.as_ref().map(|key| TableKey(key.0.as_ref()));
174        let overlapped_imms = self.imm.iter().filter(move |imm| {
175            // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive]
176            imm.min_epoch() <= max_epoch_inclusive
177                && imm.table_id == table_id
178                && range_overlap(
179                    &(left, right),
180                    &imm.start_table_key(),
181                    Included(&imm.end_table_key()),
182                )
183        });
184
185        // TODO: Remove duplicate sst based on sst id
186        let overlapped_ssts = self
187            .sst
188            .iter()
189            .filter(move |staging_sst| {
190                let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty");
191                sst_max_epoch <= max_epoch_inclusive
192            })
193            .flat_map(move |staging_sst| {
194                // TODO: sstable info should be concat-able after each streaming table owns a read
195                // version. May use concat sstable iter instead in some cases.
196                staging_sst
197                    .sstable_infos
198                    .iter()
199                    .map(|sstable| &sstable.sst_info)
200                    .filter(move |sstable: &&SstableInfo| {
201                        filter_single_sst(sstable, table_id, table_key_range)
202                    })
203            });
204        (overlapped_imms, overlapped_ssts)
205    }
206
207    pub fn is_empty(&self) -> bool {
208        self.imm.is_empty() && self.sst.is_empty()
209    }
210}
211
212#[derive(Clone)]
213/// A container of information required for reading from hummock.
214pub struct HummockReadVersion {
215    table_id: TableId,
216    instance_id: LocalInstanceId,
217
218    /// Local version for staging data.
219    staging: StagingVersion,
220
221    /// Remote version for committed data.
222    committed: CommittedVersion,
223
224    /// Indicate if this is replicated. If it is, we should ignore it during
225    /// global state store read, to avoid duplicated results.
226    /// Otherwise for local state store, it is fine, see we will see the
227    /// `ReadVersion` just for that local state store.
228    is_replicated: bool,
229
230    table_watermarks: Option<TableWatermarksIndex>,
231
232    // Vnode bitmap corresponding to the read version
233    // It will be initialized after local state store init
234    vnodes: Arc<Bitmap>,
235}
236
237impl HummockReadVersion {
238    pub fn new_with_replication_option(
239        table_id: TableId,
240        instance_id: LocalInstanceId,
241        committed_version: CommittedVersion,
242        is_replicated: bool,
243        vnodes: Arc<Bitmap>,
244    ) -> Self {
245        // before build `HummockReadVersion`, we need to get the a initial version which obtained
246        // from meta. want this initialization after version is initialized (now with
247        // notification), so add a assert condition to guarantee correct initialization order
248        assert!(committed_version.is_valid());
249        Self {
250            table_id,
251            instance_id,
252            table_watermarks: {
253                match committed_version.table_watermarks.get(&table_id) {
254                    Some(table_watermarks) => match table_watermarks.watermark_type {
255                        WatermarkSerdeType::PkPrefix => Some(TableWatermarksIndex::new_committed(
256                            table_watermarks.clone(),
257                            committed_version
258                                .state_table_info
259                                .info()
260                                .get(&table_id)
261                                .expect("should exist")
262                                .committed_epoch,
263                        )),
264
265                        WatermarkSerdeType::NonPkPrefix => None, /* do not fill the non-pk prefix watermark to index */
266                    },
267                    None => None,
268                }
269            },
270            staging: StagingVersion {
271                imm: VecDeque::default(),
272                sst: VecDeque::default(),
273            },
274
275            committed: committed_version,
276
277            is_replicated,
278            vnodes,
279        }
280    }
281
282    pub fn new(
283        table_id: TableId,
284        instance_id: LocalInstanceId,
285        committed_version: CommittedVersion,
286        vnodes: Arc<Bitmap>,
287    ) -> Self {
288        Self::new_with_replication_option(table_id, instance_id, committed_version, false, vnodes)
289    }
290
291    pub fn table_id(&self) -> TableId {
292        self.table_id
293    }
294
295    /// Updates the read version with `VersionUpdate`.
296    /// There will be three data types to be processed
297    /// `VersionUpdate::Staging`
298    ///     - `StagingData::ImmMem` -> Insert into memory's `staging_imm`
299    ///     - `StagingData::Sst` -> Update the sst to memory's `staging_sst` and remove the
300    ///       corresponding `staging_imms` according to the `batch_id`
301    /// `VersionUpdate::CommittedDelta` -> Unimplemented yet
302    /// `VersionUpdate::CommittedSnapshot` -> Update `committed_version` , and clean up related
303    /// `staging_sst` and `staging_imm` in memory according to epoch
304    pub fn update(&mut self, info: VersionUpdate) {
305        match info {
306            VersionUpdate::Staging(staging) => match staging {
307                // TODO: add a check to ensure that the added batch id of added imm is greater than
308                // the batch id of imm at the front
309                StagingData::ImmMem(imm) => {
310                    if let Some(item) = self.staging.imm.front() {
311                        // check batch_id order from newest to old
312                        debug_assert!(item.batch_id() < imm.batch_id());
313                    }
314
315                    self.staging.imm.push_front(imm)
316                }
317                StagingData::Sst(staging_sst_ref) => {
318                    let Some(imms) = staging_sst_ref.imm_ids.get(&self.instance_id) else {
319                        warn!(
320                            instance_id = self.instance_id,
321                            "no related imm in sst input"
322                        );
323                        return;
324                    };
325
326                    // old data comes first
327                    for imm_id in imms.iter().rev() {
328                        let check_err = match self.staging.imm.pop_back() {
329                            None => Some("empty".to_owned()),
330                            Some(prev_imm_id) => {
331                                if prev_imm_id.batch_id() == *imm_id {
332                                    None
333                                } else {
334                                    Some(format!(
335                                        "miss match id {} {}",
336                                        prev_imm_id.batch_id(),
337                                        *imm_id
338                                    ))
339                                }
340                            }
341                        };
342                        assert!(
343                            check_err.is_none(),
344                            "should be valid staging_sst.size {},
345                                    staging_sst.imm_ids {:?},
346                                    staging_sst.epochs {:?},
347                                    local_imm_ids {:?},
348                                    instance_id {}
349                                    check_err {:?}",
350                            staging_sst_ref.imm_size,
351                            staging_sst_ref.imm_ids,
352                            staging_sst_ref.epochs,
353                            self.staging
354                                .imm
355                                .iter()
356                                .map(|imm| imm.batch_id())
357                                .collect_vec(),
358                            self.instance_id,
359                            check_err
360                        );
361                    }
362
363                    self.staging.sst.push_front(staging_sst_ref);
364                }
365            },
366
367            VersionUpdate::CommittedSnapshot(committed_version) => {
368                if let Some(info) = committed_version
369                    .state_table_info
370                    .info()
371                    .get(&self.table_id)
372                {
373                    let committed_epoch = info.committed_epoch;
374                    self.staging.imm.retain(|imm| {
375                        if self.is_replicated {
376                            imm.min_epoch() > committed_epoch
377                        } else {
378                            assert!(imm.min_epoch() > committed_epoch);
379                            true
380                        }
381                    });
382
383                    self.staging.sst.retain(|sst| {
384                        sst.epochs.first().expect("epochs not empty") > &committed_epoch
385                    });
386
387                    // check epochs.last() > MCE
388                    assert!(self.staging.sst.iter().all(|sst| {
389                        sst.epochs.last().expect("epochs not empty") > &committed_epoch
390                    }));
391
392                    if let Some(committed_watermarks) =
393                        self.committed.table_watermarks.get(&self.table_id)
394                    {
395                        if let Some(watermark_index) = &mut self.table_watermarks {
396                            watermark_index.apply_committed_watermarks(
397                                committed_watermarks.clone(),
398                                committed_epoch,
399                            );
400                        } else {
401                            self.table_watermarks = Some(TableWatermarksIndex::new_committed(
402                                committed_watermarks.clone(),
403                                committed_epoch,
404                            ));
405                        }
406                    }
407                }
408
409                self.committed = committed_version;
410            }
411            VersionUpdate::NewTableWatermark {
412                direction,
413                epoch,
414                vnode_watermarks,
415                watermark_type,
416            } => {
417                assert_eq!(WatermarkSerdeType::PkPrefix, watermark_type);
418                if let Some(watermark_index) = &mut self.table_watermarks {
419                    watermark_index.add_epoch_watermark(
420                        epoch,
421                        Arc::from(vnode_watermarks),
422                        direction,
423                    );
424                } else {
425                    self.table_watermarks = Some(TableWatermarksIndex::new(
426                        direction,
427                        epoch,
428                        vnode_watermarks,
429                        self.committed.table_committed_epoch(self.table_id),
430                    ));
431                }
432            }
433        }
434    }
435
436    pub fn staging(&self) -> &StagingVersion {
437        &self.staging
438    }
439
440    pub fn committed(&self) -> &CommittedVersion {
441        &self.committed
442    }
443
444    /// We have assumption that the watermark is increasing monotonically. Therefore,
445    /// here if the upper layer usage has passed an regressed watermark, we should
446    /// filter out the regressed watermark. Currently the kv log store may write
447    /// regressed watermark
448    pub fn filter_regress_watermarks(&self, watermarks: &mut Vec<VnodeWatermark>) {
449        if let Some(watermark_index) = &self.table_watermarks {
450            watermark_index.filter_regress_watermarks(watermarks)
451        }
452    }
453
454    pub fn latest_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
455        self.table_watermarks
456            .as_ref()
457            .and_then(|watermark_index| watermark_index.latest_watermark(vnode))
458    }
459
460    pub fn is_replicated(&self) -> bool {
461        self.is_replicated
462    }
463
464    pub fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
465        std::mem::replace(&mut self.vnodes, vnodes)
466    }
467
468    pub fn contains(&self, vnode: VirtualNode) -> bool {
469        self.vnodes.is_set(vnode.to_index())
470    }
471
472    pub fn vnodes(&self) -> Arc<Bitmap> {
473        self.vnodes.clone()
474    }
475}
476
477pub fn read_filter_for_version(
478    epoch: HummockEpoch,
479    table_id: TableId,
480    mut table_key_range: TableKeyRange,
481    read_version: &RwLock<HummockReadVersion>,
482) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
483    let read_version_guard = read_version.read();
484
485    let committed_version = read_version_guard.committed().clone();
486
487    if let Some(watermark) = read_version_guard.table_watermarks.as_ref() {
488        watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range)
489    }
490
491    let (imm_iter, sst_iter) =
492        read_version_guard
493            .staging()
494            .prune_overlap(epoch, table_id, &table_key_range);
495
496    let imms = imm_iter.cloned().collect();
497    let ssts = sst_iter.cloned().collect();
498
499    Ok((table_key_range, (imms, ssts, committed_version)))
500}
501
502#[derive(Clone)]
503pub struct HummockVersionReader {
504    sstable_store: SstableStoreRef,
505
506    /// Statistics
507    state_store_metrics: Arc<HummockStateStoreMetrics>,
508    preload_retry_times: usize,
509}
510
511/// use `HummockVersionReader` to reuse `get` and `iter` implement for both `batch_query` and
512/// `streaming_query`
513impl HummockVersionReader {
514    pub fn new(
515        sstable_store: SstableStoreRef,
516        state_store_metrics: Arc<HummockStateStoreMetrics>,
517        preload_retry_times: usize,
518    ) -> Self {
519        Self {
520            sstable_store,
521            state_store_metrics,
522            preload_retry_times,
523        }
524    }
525
526    pub fn stats(&self) -> &Arc<HummockStateStoreMetrics> {
527        &self.state_store_metrics
528    }
529}
530
531const SLOW_ITER_FETCH_META_DURATION_SECOND: f64 = 5.0;
532
533impl HummockVersionReader {
534    pub async fn get(
535        &self,
536        table_key: TableKey<Bytes>,
537        epoch: u64,
538        read_options: ReadOptions,
539        read_version_tuple: ReadVersionTuple,
540    ) -> StorageResult<Option<StateStoreKeyedRow>> {
541        let (imms, uncommitted_ssts, committed_version) = read_version_tuple;
542
543        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
544        let mut stats_guard =
545            GetLocalMetricsGuard::new(self.state_store_metrics.clone(), read_options.table_id);
546        let local_stats = &mut stats_guard.local_stats;
547        local_stats.found_key = true;
548
549        // 1. read staging data
550        for imm in &imms {
551            // skip imm that only holding out-of-date data
552            if imm.max_epoch() < min_epoch {
553                continue;
554            }
555
556            local_stats.staging_imm_get_count += 1;
557
558            if let Some((data, data_epoch)) = get_from_batch(
559                imm,
560                TableKey(table_key.as_ref()),
561                epoch,
562                &read_options,
563                local_stats,
564            ) {
565                return Ok(if data_epoch.pure_epoch() < min_epoch {
566                    None
567                } else {
568                    data.into_user_value().map(|v| {
569                        (
570                            FullKey::new_with_gap_epoch(
571                                read_options.table_id,
572                                table_key.clone(),
573                                data_epoch,
574                            ),
575                            v,
576                        )
577                    })
578                });
579            }
580        }
581
582        // 2. order guarantee: imm -> sst
583        let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| {
584            Sstable::hash_for_bloom_filter(dist_key.as_ref(), read_options.table_id.table_id())
585        });
586
587        // Here epoch passed in is pure epoch, and we will seek the constructed `full_key` later.
588        // Therefore, it is necessary to construct the `full_key` with `MAX_SPILL_TIMES`, otherwise, the iterator might skip keys with spill offset greater than 0.
589        let full_key = FullKey::new_with_gap_epoch(
590            read_options.table_id,
591            TableKey(table_key.clone()),
592            EpochWithGap::new(epoch, MAX_SPILL_TIMES),
593        );
594        for local_sst in &uncommitted_ssts {
595            local_stats.staging_sst_get_count += 1;
596            if let Some((data, data_epoch)) = get_from_sstable_info(
597                self.sstable_store.clone(),
598                local_sst,
599                full_key.to_ref(),
600                &read_options,
601                dist_key_hash,
602                local_stats,
603            )
604            .await?
605            {
606                return Ok(if data_epoch.pure_epoch() < min_epoch {
607                    None
608                } else {
609                    data.into_user_value().map(|v| {
610                        (
611                            FullKey::new_with_gap_epoch(
612                                read_options.table_id,
613                                table_key.clone(),
614                                data_epoch,
615                            ),
616                            v,
617                        )
618                    })
619                });
620            }
621        }
622        let single_table_key_range = table_key.clone()..=table_key.clone();
623        // 3. read from committed_version sst file
624        // Because SST meta records encoded key range,
625        // the filter key needs to be encoded as well.
626        assert!(committed_version.is_valid());
627        for level in committed_version.levels(read_options.table_id) {
628            if level.table_infos.is_empty() {
629                continue;
630            }
631
632            match level.level_type {
633                LevelType::Overlapping | LevelType::Unspecified => {
634                    let sstable_infos = prune_overlapping_ssts(
635                        &level.table_infos,
636                        read_options.table_id,
637                        &single_table_key_range,
638                    );
639                    for sstable_info in sstable_infos {
640                        local_stats.overlapping_get_count += 1;
641                        if let Some((data, data_epoch)) = get_from_sstable_info(
642                            self.sstable_store.clone(),
643                            sstable_info,
644                            full_key.to_ref(),
645                            &read_options,
646                            dist_key_hash,
647                            local_stats,
648                        )
649                        .await?
650                        {
651                            return Ok(if data_epoch.pure_epoch() < min_epoch {
652                                None
653                            } else {
654                                data.into_user_value().map(|v| {
655                                    (
656                                        FullKey::new_with_gap_epoch(
657                                            read_options.table_id,
658                                            table_key.clone(),
659                                            data_epoch,
660                                        ),
661                                        v,
662                                    )
663                                })
664                            });
665                        }
666                    }
667                }
668                LevelType::Nonoverlapping => {
669                    let mut table_info_idx =
670                        search_sst_idx(&level.table_infos, full_key.user_key.as_ref());
671                    if table_info_idx == 0 {
672                        continue;
673                    }
674                    table_info_idx = table_info_idx.saturating_sub(1);
675                    let ord = level.table_infos[table_info_idx]
676                        .key_range
677                        .compare_right_with_user_key(full_key.user_key.as_ref());
678                    // the case that the key falls into the gap between two ssts
679                    if ord == Ordering::Less {
680                        sync_point!("HUMMOCK_V2::GET::SKIP_BY_NO_FILE");
681                        continue;
682                    }
683
684                    local_stats.non_overlapping_get_count += 1;
685                    if let Some((data, data_epoch)) = get_from_sstable_info(
686                        self.sstable_store.clone(),
687                        &level.table_infos[table_info_idx],
688                        full_key.to_ref(),
689                        &read_options,
690                        dist_key_hash,
691                        local_stats,
692                    )
693                    .await?
694                    {
695                        return Ok(if data_epoch.pure_epoch() < min_epoch {
696                            None
697                        } else {
698                            data.into_user_value().map(|v| {
699                                (
700                                    FullKey::new_with_gap_epoch(
701                                        read_options.table_id,
702                                        table_key.clone(),
703                                        data_epoch,
704                                    ),
705                                    v,
706                                )
707                            })
708                        });
709                    }
710                }
711            }
712        }
713        stats_guard.local_stats.found_key = false;
714        Ok(None)
715    }
716
717    pub async fn iter(
718        &self,
719        table_key_range: TableKeyRange,
720        epoch: u64,
721        read_options: ReadOptions,
722        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
723    ) -> StorageResult<HummockStorageIterator> {
724        self.iter_with_memtable(
725            table_key_range,
726            epoch,
727            read_options,
728            read_version_tuple,
729            None,
730        )
731        .await
732    }
733
734    pub async fn iter_with_memtable<'b>(
735        &self,
736        table_key_range: TableKeyRange,
737        epoch: u64,
738        read_options: ReadOptions,
739        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
740        memtable_iter: Option<MemTableHummockIterator<'b>>,
741    ) -> StorageResult<HummockStorageIteratorInner<'b>> {
742        let user_key_range_ref = bound_table_key_range(read_options.table_id, &table_key_range);
743        let user_key_range = (
744            user_key_range_ref.0.map(|key| key.cloned()),
745            user_key_range_ref.1.map(|key| key.cloned()),
746        );
747        let mut factory = ForwardIteratorFactory::default();
748        let mut local_stats = StoreLocalStatistic::default();
749        let (imms, uncommitted_ssts, committed) = read_version_tuple;
750        let table_id = read_options.table_id;
751        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
752        self.iter_inner(
753            table_key_range,
754            epoch,
755            read_options,
756            imms,
757            uncommitted_ssts,
758            &committed,
759            &mut local_stats,
760            &mut factory,
761        )
762        .await?;
763        let merge_iter = factory.build(memtable_iter);
764        // the epoch_range left bound for iterator read
765        let mut user_iter = UserIterator::new(
766            merge_iter,
767            user_key_range,
768            epoch,
769            min_epoch,
770            Some(committed),
771        );
772        user_iter.rewind().await?;
773        Ok(HummockStorageIteratorInner::new(
774            user_iter,
775            self.state_store_metrics.clone(),
776            table_id,
777            local_stats,
778        ))
779    }
780
781    pub async fn rev_iter<'b>(
782        &self,
783        table_key_range: TableKeyRange,
784        epoch: u64,
785        read_options: ReadOptions,
786        read_version_tuple: (Vec<ImmutableMemtable>, Vec<SstableInfo>, CommittedVersion),
787        memtable_iter: Option<MemTableHummockRevIterator<'b>>,
788    ) -> StorageResult<HummockStorageRevIteratorInner<'b>> {
789        let user_key_range_ref = bound_table_key_range(read_options.table_id, &table_key_range);
790        let user_key_range = (
791            user_key_range_ref.0.map(|key| key.cloned()),
792            user_key_range_ref.1.map(|key| key.cloned()),
793        );
794        let mut factory = BackwardIteratorFactory::default();
795        let mut local_stats = StoreLocalStatistic::default();
796        let (imms, uncommitted_ssts, committed) = read_version_tuple;
797        let table_id = read_options.table_id;
798        let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref());
799        self.iter_inner(
800            table_key_range,
801            epoch,
802            read_options,
803            imms,
804            uncommitted_ssts,
805            &committed,
806            &mut local_stats,
807            &mut factory,
808        )
809        .await?;
810        let merge_iter = factory.build(memtable_iter);
811        // the epoch_range left bound for iterator read
812        let mut user_iter = BackwardUserIterator::new(
813            merge_iter,
814            user_key_range,
815            epoch,
816            min_epoch,
817            Some(committed),
818        );
819        user_iter.rewind().await?;
820        Ok(HummockStorageRevIteratorInner::new(
821            user_iter,
822            self.state_store_metrics.clone(),
823            table_id,
824            local_stats,
825        ))
826    }
827
828    pub async fn iter_inner<F: IteratorFactory>(
829        &self,
830        table_key_range: TableKeyRange,
831        epoch: u64,
832        read_options: ReadOptions,
833        imms: Vec<ImmutableMemtable>,
834        uncommitted_ssts: Vec<SstableInfo>,
835        committed: &CommittedVersion,
836        local_stats: &mut StoreLocalStatistic,
837        factory: &mut F,
838    ) -> StorageResult<()> {
839        local_stats.staging_imm_iter_count = imms.len() as u64;
840        for imm in imms {
841            factory.add_batch_iter(imm);
842        }
843
844        // 2. build iterator from committed
845        // Because SST meta records encoded key range,
846        // the filter key range needs to be encoded as well.
847        let user_key_range = bound_table_key_range(read_options.table_id, &table_key_range);
848        let user_key_range_ref = (
849            user_key_range.0.as_ref().map(UserKey::as_ref),
850            user_key_range.1.as_ref().map(UserKey::as_ref),
851        );
852        let mut staging_sst_iter_count = 0;
853        // encode once
854        let bloom_filter_prefix_hash = read_options
855            .prefix_hint
856            .as_ref()
857            .map(|hint| Sstable::hash_for_bloom_filter(hint, read_options.table_id.table_id()));
858        let mut sst_read_options = SstableIteratorReadOptions::from_read_options(&read_options);
859        if read_options.prefetch_options.prefetch {
860            sst_read_options.must_iterated_end_user_key =
861                Some(user_key_range.1.map(|key| key.cloned()));
862            sst_read_options.max_preload_retry_times = self.preload_retry_times;
863        }
864        let sst_read_options = Arc::new(sst_read_options);
865        for sstable_info in &uncommitted_ssts {
866            let table_holder = self
867                .sstable_store
868                .sstable(sstable_info, local_stats)
869                .await?;
870
871            if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() {
872                if !hit_sstable_bloom_filter(
873                    &table_holder,
874                    &user_key_range_ref,
875                    *prefix_hash,
876                    local_stats,
877                ) {
878                    continue;
879                }
880            }
881
882            staging_sst_iter_count += 1;
883            factory.add_staging_sst_iter(F::SstableIteratorType::create(
884                table_holder,
885                self.sstable_store.clone(),
886                sst_read_options.clone(),
887                sstable_info,
888            ));
889        }
890        local_stats.staging_sst_iter_count = staging_sst_iter_count;
891
892        let timer = Instant::now();
893
894        for level in committed.levels(read_options.table_id) {
895            if level.table_infos.is_empty() {
896                continue;
897            }
898
899            if level.level_type == LevelType::Nonoverlapping {
900                let mut table_infos = prune_nonoverlapping_ssts(
901                    &level.table_infos,
902                    user_key_range_ref,
903                    read_options.table_id.table_id(),
904                )
905                .peekable();
906
907                if table_infos.peek().is_none() {
908                    continue;
909                }
910                let sstable_infos = table_infos.cloned().collect_vec();
911                if sstable_infos.len() > 1 {
912                    factory.add_concat_sst_iter(
913                        sstable_infos,
914                        self.sstable_store.clone(),
915                        sst_read_options.clone(),
916                    );
917                    local_stats.non_overlapping_iter_count += 1;
918                } else {
919                    let sstable = self
920                        .sstable_store
921                        .sstable(&sstable_infos[0], local_stats)
922                        .await?;
923
924                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
925                        if !hit_sstable_bloom_filter(
926                            &sstable,
927                            &user_key_range_ref,
928                            *dist_hash,
929                            local_stats,
930                        ) {
931                            continue;
932                        }
933                    }
934                    // Since there is only one sst to be included for the current non-overlapping
935                    // level, there is no need to create a ConcatIterator on it.
936                    // We put the SstableIterator in `overlapping_iters` just for convenience since
937                    // it overlaps with SSTs in other levels. In metrics reporting, we still count
938                    // it in `non_overlapping_iter_count`.
939                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
940                        sstable,
941                        self.sstable_store.clone(),
942                        sst_read_options.clone(),
943                        &sstable_infos[0],
944                    ));
945                    local_stats.non_overlapping_iter_count += 1;
946                }
947            } else {
948                let table_infos = prune_overlapping_ssts(
949                    &level.table_infos,
950                    read_options.table_id,
951                    &table_key_range,
952                );
953                // Overlapping
954                let fetch_meta_req = table_infos.rev().collect_vec();
955                if fetch_meta_req.is_empty() {
956                    continue;
957                }
958                for sstable_info in fetch_meta_req {
959                    let sstable = self
960                        .sstable_store
961                        .sstable(sstable_info, local_stats)
962                        .await?;
963                    assert_eq!(sstable_info.object_id, sstable.id);
964                    if let Some(dist_hash) = bloom_filter_prefix_hash.as_ref() {
965                        if !hit_sstable_bloom_filter(
966                            &sstable,
967                            &user_key_range_ref,
968                            *dist_hash,
969                            local_stats,
970                        ) {
971                            continue;
972                        }
973                    }
974                    factory.add_overlapping_sst_iter(F::SstableIteratorType::create(
975                        sstable,
976                        self.sstable_store.clone(),
977                        sst_read_options.clone(),
978                        sstable_info,
979                    ));
980                    local_stats.overlapping_iter_count += 1;
981                }
982            }
983        }
984        let fetch_meta_duration_sec = timer.elapsed().as_secs_f64();
985        if fetch_meta_duration_sec > SLOW_ITER_FETCH_META_DURATION_SECOND {
986            let table_id_string = read_options.table_id.to_string();
987            tracing::warn!(
988                "Fetching meta while creating an iter to read table_id {:?} at epoch {:?} is slow: duration = {:?}s, cache unhits = {:?}.",
989                table_id_string,
990                epoch,
991                fetch_meta_duration_sec,
992                local_stats.cache_meta_block_miss
993            );
994            self.state_store_metrics
995                .iter_slow_fetch_meta_cache_unhits
996                .set(local_stats.cache_meta_block_miss as i64);
997        }
998        Ok(())
999    }
1000
1001    pub async fn iter_log(
1002        &self,
1003        version: PinnedVersion,
1004        epoch_range: (u64, u64),
1005        key_range: TableKeyRange,
1006        options: ReadLogOptions,
1007    ) -> HummockResult<ChangeLogIterator> {
1008        let change_log = {
1009            let table_change_logs = version.table_change_log_read_lock();
1010            if let Some(change_log) = table_change_logs.get(&options.table_id) {
1011                change_log.filter_epoch(epoch_range).cloned().collect_vec()
1012            } else {
1013                Vec::new()
1014            }
1015        };
1016
1017        if let Some(max_epoch_change_log) = change_log.last() {
1018            let (_, max_epoch) = epoch_range;
1019            if !max_epoch_change_log.epochs.contains(&max_epoch) {
1020                warn!(
1021                    max_epoch,
1022                    change_log_epochs = ?change_log.iter().flat_map(|epoch_log| epoch_log.epochs.iter()).collect_vec(),
1023                    table_id = options.table_id.table_id,
1024                    "max_epoch does not exist"
1025                );
1026            }
1027        }
1028        let read_options = Arc::new(SstableIteratorReadOptions {
1029            cache_policy: Default::default(),
1030            must_iterated_end_user_key: None,
1031            max_preload_retry_times: 0,
1032            prefetch_for_large_query: false,
1033        });
1034
1035        async fn make_iter(
1036            sstable_infos: impl Iterator<Item = &SstableInfo>,
1037            sstable_store: &SstableStoreRef,
1038            read_options: Arc<SstableIteratorReadOptions>,
1039            local_stat: &mut StoreLocalStatistic,
1040        ) -> HummockResult<MergeIterator<SstableIterator>> {
1041            let iters = try_join_all(sstable_infos.map(|sstable_info| {
1042                let sstable_store = sstable_store.clone();
1043                let read_options = read_options.clone();
1044                async move {
1045                    let mut local_stat = StoreLocalStatistic::default();
1046                    let table_holder = sstable_store.sstable(sstable_info, &mut local_stat).await?;
1047                    Ok::<_, HummockError>((
1048                        SstableIterator::new(
1049                            table_holder,
1050                            sstable_store,
1051                            read_options,
1052                            sstable_info,
1053                        ),
1054                        local_stat,
1055                    ))
1056                }
1057            }))
1058            .await?;
1059            Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
1060                |(iter, stats)| {
1061                    local_stat.add(&stats);
1062                    iter
1063                },
1064            )))
1065        }
1066
1067        let mut local_stat = StoreLocalStatistic::default();
1068
1069        let new_value_iter = make_iter(
1070            change_log
1071                .iter()
1072                .flat_map(|log| log.new_value.iter())
1073                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1074            &self.sstable_store,
1075            read_options.clone(),
1076            &mut local_stat,
1077        )
1078        .await?;
1079        let old_value_iter = make_iter(
1080            change_log
1081                .iter()
1082                .flat_map(|log| log.old_value.iter())
1083                .filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
1084            &self.sstable_store,
1085            read_options.clone(),
1086            &mut local_stat,
1087        )
1088        .await?;
1089        ChangeLogIterator::new(
1090            epoch_range,
1091            key_range,
1092            new_value_iter,
1093            old_value_iter,
1094            options.table_id,
1095            IterLocalMetricsGuard::new(
1096                self.state_store_metrics.clone(),
1097                options.table_id,
1098                local_stat,
1099            ),
1100        )
1101        .await
1102    }
1103}