risingwave_storage/hummock/iterator/
concat_inner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering::{Equal, Greater, Less};
16use std::sync::Arc;
17
18use risingwave_hummock_sdk::key::FullKey;
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20
21use crate::hummock::iterator::{
22    DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta,
23};
24use crate::hummock::sstable::SstableIteratorReadOptions;
25use crate::hummock::value::HummockValue;
26use crate::hummock::{HummockResult, SstableIteratorType, SstableStoreRef};
27use crate::monitor::StoreLocalStatistic;
28
29fn smallest_key(sstable_info: &SstableInfo) -> &[u8] {
30    &sstable_info.key_range.left
31}
32
33fn largest_key(sstable_info: &SstableInfo) -> &[u8] {
34    &sstable_info.key_range.right
35}
36
37/// Served as the concrete implementation of `ConcatIterator` and `BackwardConcatIterator`.
38pub struct ConcatIteratorInner<TI: SstableIteratorType> {
39    /// The iterator of the current table.
40    sstable_iter: Option<TI>,
41
42    /// Current table index.
43    cur_idx: usize,
44
45    /// All non-overlapping `sstable_infos`.
46    sstable_infos: Vec<SstableInfo>,
47
48    sstable_store: SstableStoreRef,
49
50    stats: StoreLocalStatistic,
51    read_options: Arc<SstableIteratorReadOptions>,
52}
53
54impl<TI: SstableIteratorType> ConcatIteratorInner<TI> {
55    /// Caller should make sure that `sstable_infos` are non-overlapping,
56    /// arranged in ascending order when it serves as a forward iterator,
57    /// and arranged in descending order when it serves as a backward iterator.
58    pub fn new(
59        sstable_infos: Vec<SstableInfo>,
60        sstable_store: SstableStoreRef,
61        read_options: Arc<SstableIteratorReadOptions>,
62    ) -> Self {
63        Self {
64            sstable_iter: None,
65            cur_idx: 0,
66            sstable_infos,
67            sstable_store,
68            stats: StoreLocalStatistic::default(),
69            read_options,
70        }
71    }
72
73    /// Seeks to a table, and then seeks to the key if `seek_key` is given.
74    async fn seek_idx(
75        &mut self,
76        idx: usize,
77        seek_key: Option<FullKey<&[u8]>>,
78    ) -> HummockResult<()> {
79        if idx >= self.sstable_infos.len() {
80            if let Some(old_iter) = self.sstable_iter.take() {
81                old_iter.collect_local_statistic(&mut self.stats);
82            }
83            self.cur_idx = self.sstable_infos.len();
84        } else {
85            let table = self
86                .sstable_store
87                .sstable(&self.sstable_infos[idx], &mut self.stats)
88                .await?;
89            let mut sstable_iter = TI::create(
90                table,
91                self.sstable_store.clone(),
92                self.read_options.clone(),
93                &self.sstable_infos[idx],
94            );
95
96            if let Some(key) = seek_key {
97                sstable_iter.seek(key).await?;
98            } else {
99                sstable_iter.rewind().await?;
100            }
101
102            if let Some(old_iter) = self.sstable_iter.take() {
103                old_iter.collect_local_statistic(&mut self.stats);
104            }
105
106            self.sstable_iter = Some(sstable_iter);
107            self.cur_idx = idx;
108        }
109        Ok(())
110    }
111}
112
113impl<TI: SstableIteratorType> HummockIterator for ConcatIteratorInner<TI> {
114    type Direction = TI::Direction;
115
116    async fn next(&mut self) -> HummockResult<()> {
117        let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
118        sstable_iter.next().await?;
119
120        if sstable_iter.is_valid() {
121            Ok(())
122        } else {
123            // seek to next table
124            let mut table_idx = self.cur_idx + 1;
125            while !self.is_valid() && table_idx < self.sstable_infos.len() {
126                self.seek_idx(table_idx, None).await?;
127                table_idx += 1;
128            }
129            Ok(())
130        }
131    }
132
133    fn key(&self) -> FullKey<&[u8]> {
134        self.sstable_iter.as_ref().expect("no table iter").key()
135    }
136
137    fn value(&self) -> HummockValue<&[u8]> {
138        self.sstable_iter.as_ref().expect("no table iter").value()
139    }
140
141    fn is_valid(&self) -> bool {
142        self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
143    }
144
145    async fn rewind(&mut self) -> HummockResult<()> {
146        self.seek_idx(0, None).await?;
147        let mut table_idx = 1;
148        while !self.is_valid() && table_idx < self.sstable_infos.len() {
149            // Seek to next table
150            self.seek_idx(table_idx, None).await?;
151            table_idx += 1;
152        }
153        Ok(())
154    }
155
156    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
157        let mut table_idx = self
158            .sstable_infos
159            .partition_point(|table| match Self::Direction::direction() {
160                DirectionEnum::Forward => {
161                    let ord = FullKey::decode(smallest_key(table)).cmp(&key);
162
163                    ord == Less || ord == Equal
164                }
165                DirectionEnum::Backward => {
166                    let ord = FullKey::decode(largest_key(table)).cmp(&key);
167                    ord == Greater || (ord == Equal && !table.key_range.right_exclusive)
168                }
169            })
170            .saturating_sub(1); // considering the boundary of 0
171
172        self.seek_idx(table_idx, Some(key)).await?;
173        table_idx += 1;
174        while !self.is_valid() && table_idx < self.sstable_infos.len() {
175            // Seek to next table
176            self.seek_idx(table_idx, None).await?;
177            table_idx += 1;
178        }
179        Ok(())
180    }
181
182    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
183        stats.add(&self.stats);
184        if let Some(iter) = &self.sstable_iter {
185            iter.collect_local_statistic(stats);
186        }
187    }
188
189    fn value_meta(&self) -> ValueMeta {
190        self.sstable_iter
191            .as_ref()
192            .expect("no table iter")
193            .value_meta()
194    }
195}