risingwave_storage/hummock/iterator/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::marker::PhantomData;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19
20use more_asserts::{assert_gt, assert_lt};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22
23use super::{
24    HummockResult, HummockValue, SstableIteratorReadOptions, SstableIteratorType, SstableStoreRef,
25};
26
27mod forward_concat;
28pub use forward_concat::*;
29mod backward_concat;
30mod concat_inner;
31pub use backward_concat::*;
32pub use concat_inner::ConcatIteratorInner;
33mod backward_merge;
34
35mod backward_user;
36pub use backward_user::*;
37mod forward_merge;
38
39pub mod forward_user;
40mod merge_inner;
41pub use forward_user::*;
42pub use merge_inner::MergeIterator;
43use risingwave_hummock_sdk::EpochWithGap;
44use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
45
46use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third};
47
48pub mod change_log;
49mod skip_watermark;
50#[cfg(any(test, feature = "test"))]
51pub mod test_utils;
52
53use risingwave_common::catalog::TableId;
54pub use skip_watermark::*;
55
56use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
57use crate::monitor::StoreLocalStatistic;
58
59#[derive(Default)]
60pub struct ValueMeta {
61    pub object_id: Option<u64>,
62    pub block_id: Option<u64>,
63}
64
65/// `HummockIterator` defines the interface of all iterators, including `SstableIterator`,
66/// `MergeIterator`, `UserIterator` and `ConcatIterator`.
67///
68/// After creating the iterator instance,
69/// - if you want to iterate from the beginning, you need to then call its `rewind` method.
70/// - if you want to iterate from some specific position, you need to then call its `seek` method.
71pub trait HummockIterator: Send {
72    type Direction: HummockIteratorDirection;
73    /// Moves a valid iterator to the next key.
74    ///
75    /// Note:
76    /// - Before calling this function, makes sure the iterator `is_valid`.
77    /// - After calling this function, you may first check whether the iterator `is_valid` again,
78    ///   then get the new data by calling `key` and `value`.
79    /// - If the position after calling this is invalid, this function WON'T return an `Err`. You
80    ///   should check `is_valid` before continuing the iteration.
81    ///
82    /// # Panics
83    /// This function will panic if the iterator is invalid.
84    fn next(&mut self) -> impl Future<Output = HummockResult<()>> + Send + '_;
85
86    /// Retrieves the current key.
87    ///
88    /// Note:
89    /// - Before calling this function, makes sure the iterator `is_valid`.
90    /// - This function should be straightforward and return immediately.
91    ///
92    /// # Panics
93    /// This function will panic if the iterator is invalid.
94    fn key(&self) -> FullKey<&[u8]>;
95
96    /// Retrieves the current value, decoded as [`HummockValue`].
97    ///
98    /// Note:
99    /// - Before calling this function, makes sure the iterator `is_valid`.
100    /// - This function should be straightforward and return immediately.
101    ///
102    /// # Panics
103    /// This function will panic if the iterator is invalid, or the value cannot be decoded into
104    /// [`HummockValue`].
105    fn value(&self) -> HummockValue<&[u8]>;
106
107    /// Indicates whether the iterator can be used.
108    ///
109    /// Note:
110    /// - ONLY call `key`, `value`, and `next` if `is_valid` returns `true`.
111    /// - This function should be straightforward and return immediately.
112    fn is_valid(&self) -> bool;
113
114    /// Resets the position of the iterator.
115    ///
116    /// Note:
117    /// - Do not decide whether the position is valid or not by checking the returned error of this
118    ///   function. This function WON'T return an `Err` if invalid. You should check `is_valid`
119    ///   before starting iteration.
120    fn rewind(&mut self) -> impl Future<Output = HummockResult<()>> + Send + '_;
121
122    /// Resets iterator and seeks to the first position where the key >= provided key, or key <=
123    /// provided key if this is a backward iterator.
124    ///
125    /// Note:
126    /// - Do not decide whether the position is valid or not by checking the returned error of this
127    ///   function. This function WON'T return an `Err` if invalid. You should check `is_valid`
128    ///   before starting iteration.
129    fn seek<'a>(
130        &'a mut self,
131        key: FullKey<&'a [u8]>,
132    ) -> impl Future<Output = HummockResult<()>> + Send;
133
134    /// take local statistic info from iterator to report metrics.
135    fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic);
136
137    /// Returns value meta.
138    fn value_meta(&self) -> ValueMeta;
139}
140
141/// This is a placeholder trait used in `HummockIteratorUnion`
142pub struct PhantomHummockIterator<D: HummockIteratorDirection> {
143    _phantom: PhantomData<D>,
144}
145
146impl<D: HummockIteratorDirection> HummockIterator for PhantomHummockIterator<D> {
147    type Direction = D;
148
149    async fn next(&mut self) -> HummockResult<()> {
150        unreachable!()
151    }
152
153    fn key(&self) -> FullKey<&[u8]> {
154        unreachable!()
155    }
156
157    fn value(&self) -> HummockValue<&[u8]> {
158        unreachable!()
159    }
160
161    fn is_valid(&self) -> bool {
162        unreachable!()
163    }
164
165    async fn rewind(&mut self) -> HummockResult<()> {
166        unreachable!()
167    }
168
169    async fn seek<'a>(&'a mut self, _key: FullKey<&'a [u8]>) -> HummockResult<()> {
170        unreachable!()
171    }
172
173    fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}
174
175    fn value_meta(&self) -> ValueMeta {
176        unreachable!()
177    }
178}
179
180/// The `HummockIteratorUnion` acts like a wrapper over multiple types of `HummockIterator`, so that
181/// the `MergeIterator`, which previously takes multiple different `HummockIterator`s as input
182/// through `Box<dyn HummockIterator>`, can now wrap all its underlying `HummockIterator` over such
183/// `HummockIteratorUnion`, and the input type of the `MergeIterator` so that the input type of
184/// `HummockIterator` can be determined statically at compile time.
185///
186/// For example, in `ForwardUserIterator`, it accepts inputs from 4 sources for its underlying
187/// `MergeIterator`. First, the shared buffer replicated batches, and second, the shared buffer
188/// uncommitted data, and third, the overlapping L0 data, and last, the non-L0 non-overlapping
189/// concat-able. These sources used to be passed in as `Box<dyn HummockIterator>`. Now if we want
190/// the `MergeIterator` to be statically typed, the input type of `MergeIterator` will become the
191/// `HummockIteratorUnion` of these 4 sources.
192pub enum HummockIteratorUnion<
193    D: HummockIteratorDirection,
194    I1: HummockIterator<Direction = D>,
195    I2: HummockIterator<Direction = D>,
196    I3: HummockIterator<Direction = D> = PhantomHummockIterator<D>,
197    I4: HummockIterator<Direction = D> = PhantomHummockIterator<D>,
198> {
199    First(I1),
200    Second(I2),
201    Third(I3),
202    Fourth(I4),
203}
204
205impl<
206    D: HummockIteratorDirection,
207    I1: HummockIterator<Direction = D>,
208    I2: HummockIterator<Direction = D>,
209    I3: HummockIterator<Direction = D>,
210    I4: HummockIterator<Direction = D>,
211> HummockIterator for HummockIteratorUnion<D, I1, I2, I3, I4>
212{
213    type Direction = D;
214
215    async fn next(&mut self) -> HummockResult<()> {
216        match self {
217            First(iter) => iter.next().await,
218            Second(iter) => iter.next().await,
219            Third(iter) => iter.next().await,
220            Fourth(iter) => iter.next().await,
221        }
222    }
223
224    fn key(&self) -> FullKey<&[u8]> {
225        match self {
226            First(iter) => iter.key(),
227            Second(iter) => iter.key(),
228            Third(iter) => iter.key(),
229            Fourth(iter) => iter.key(),
230        }
231    }
232
233    fn value(&self) -> HummockValue<&[u8]> {
234        match self {
235            First(iter) => iter.value(),
236            Second(iter) => iter.value(),
237            Third(iter) => iter.value(),
238            Fourth(iter) => iter.value(),
239        }
240    }
241
242    fn is_valid(&self) -> bool {
243        match self {
244            First(iter) => iter.is_valid(),
245            Second(iter) => iter.is_valid(),
246            Third(iter) => iter.is_valid(),
247            Fourth(iter) => iter.is_valid(),
248        }
249    }
250
251    async fn rewind(&mut self) -> HummockResult<()> {
252        match self {
253            First(iter) => iter.rewind().await,
254            Second(iter) => iter.rewind().await,
255            Third(iter) => iter.rewind().await,
256            Fourth(iter) => iter.rewind().await,
257        }
258    }
259
260    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
261        match self {
262            First(iter) => iter.seek(key).await,
263            Second(iter) => iter.seek(key).await,
264            Third(iter) => iter.seek(key).await,
265            Fourth(iter) => iter.seek(key).await,
266        }
267    }
268
269    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
270        match self {
271            First(iter) => iter.collect_local_statistic(stats),
272            Second(iter) => iter.collect_local_statistic(stats),
273            Third(iter) => iter.collect_local_statistic(stats),
274            Fourth(iter) => iter.collect_local_statistic(stats),
275        }
276    }
277
278    fn value_meta(&self) -> ValueMeta {
279        match self {
280            First(iter) => iter.value_meta(),
281            Second(iter) => iter.value_meta(),
282            Third(iter) => iter.value_meta(),
283            Fourth(iter) => iter.value_meta(),
284        }
285    }
286}
287
288impl<I: HummockIterator> HummockIterator for Box<I> {
289    type Direction = I::Direction;
290
291    async fn next(&mut self) -> HummockResult<()> {
292        (*self).deref_mut().next().await
293    }
294
295    fn key(&self) -> FullKey<&[u8]> {
296        (*self).deref().key()
297    }
298
299    fn value(&self) -> HummockValue<&[u8]> {
300        (*self).deref().value()
301    }
302
303    fn is_valid(&self) -> bool {
304        (*self).deref().is_valid()
305    }
306
307    async fn rewind(&mut self) -> HummockResult<()> {
308        (*self).deref_mut().rewind().await
309    }
310
311    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
312        (*self).deref_mut().seek(key).await
313    }
314
315    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
316        (*self).deref().collect_local_statistic(stats);
317    }
318
319    fn value_meta(&self) -> ValueMeta {
320        (*self).deref().value_meta()
321    }
322}
323
324pub enum RustIteratorOfBuilder<'a, B: RustIteratorBuilder> {
325    Seek(B::SeekIter<'a>),
326    Rewind(B::RewindIter<'a>),
327}
328
329impl<'a, B: RustIteratorBuilder> Iterator for RustIteratorOfBuilder<'a, B> {
330    type Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>);
331
332    fn next(&mut self) -> Option<Self::Item> {
333        match self {
334            RustIteratorOfBuilder::Seek(i) => i.next(),
335            RustIteratorOfBuilder::Rewind(i) => i.next(),
336        }
337    }
338}
339
340pub trait RustIteratorBuilder: Send + Sync + 'static {
341    type Iterable: Send + Sync;
342    type Direction: HummockIteratorDirection;
343    type RewindIter<'a>: Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)>
344        + Send
345        + Sync
346        + 'a;
347    type SeekIter<'a>: Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)>
348        + Send
349        + Sync
350        + 'a;
351
352    fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a>;
353    fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_>;
354}
355
356pub struct FromRustIterator<'a, B: RustIteratorBuilder> {
357    inner: &'a B::Iterable,
358    #[expect(clippy::type_complexity)]
359    iter: Option<(
360        RustIteratorOfBuilder<'a, B>,
361        TableKey<&'a [u8]>,
362        HummockValue<&'a [u8]>,
363    )>,
364    epoch: EpochWithGap,
365    table_id: TableId,
366}
367
368impl<'a, B: RustIteratorBuilder> FromRustIterator<'a, B> {
369    pub fn new(inner: &'a B::Iterable, epoch: EpochWithGap, table_id: TableId) -> Self {
370        Self {
371            inner,
372            iter: None,
373            epoch,
374            table_id,
375        }
376    }
377
378    async fn seek_inner<'b>(&'b mut self, key: FullKey<&'b [u8]>) -> HummockResult<()> {
379        match self.table_id.cmp(&key.user_key.table_id) {
380            std::cmp::Ordering::Less => {
381                self.iter = None;
382                return Ok(());
383            }
384            std::cmp::Ordering::Greater => {
385                return self.rewind().await;
386            }
387            _ => {}
388        }
389        let mut iter = B::seek(self.inner, key.user_key.table_key);
390        match iter.next() {
391            Some((first_key, first_value)) => {
392                if first_key.eq(&key.user_key.table_key) && self.epoch > key.epoch_with_gap {
393                    // The semantic of `seek_fn` will ensure that `first_key` >= table_key of `key`.
394                    // At the beginning we have checked that `self.table_id` >= table_id of `key`.
395                    match iter.next() {
396                        Some((next_key, next_value)) => {
397                            assert_gt!(next_key, first_key);
398                            self.iter =
399                                Some((RustIteratorOfBuilder::Seek(iter), next_key, next_value));
400                        }
401                        None => {
402                            self.iter = None;
403                        }
404                    }
405                } else {
406                    self.iter = Some((RustIteratorOfBuilder::Seek(iter), first_key, first_value));
407                }
408            }
409            None => {
410                self.iter = None;
411            }
412        }
413        Ok(())
414    }
415
416    async fn rev_seek_inner<'b>(&'b mut self, key: FullKey<&'b [u8]>) -> HummockResult<()> {
417        match self.table_id.cmp(&key.user_key.table_id) {
418            std::cmp::Ordering::Less => {
419                return self.rewind().await;
420            }
421            std::cmp::Ordering::Greater => {
422                self.iter = None;
423                return Ok(());
424            }
425            _ => {}
426        }
427        let mut iter = B::seek(self.inner, key.user_key.table_key);
428        match iter.next() {
429            Some((first_key, first_value)) => {
430                if first_key.eq(&key.user_key.table_key) && self.epoch < key.epoch_with_gap {
431                    // The semantic of `seek_fn` will ensure that `first_key` <= table_key of `key`.
432                    // At the beginning we have checked that `self.table_id` <= table_id of `key`.
433                    match iter.next() {
434                        Some((next_key, next_value)) => {
435                            assert_lt!(next_key, first_key);
436                            self.iter =
437                                Some((RustIteratorOfBuilder::Seek(iter), next_key, next_value));
438                        }
439                        None => {
440                            self.iter = None;
441                        }
442                    }
443                } else {
444                    self.iter = Some((RustIteratorOfBuilder::Seek(iter), first_key, first_value));
445                }
446            }
447            None => {
448                self.iter = None;
449            }
450        }
451        Ok(())
452    }
453}
454
455impl<B: RustIteratorBuilder> HummockIterator for FromRustIterator<'_, B> {
456    type Direction = B::Direction;
457
458    async fn next(&mut self) -> HummockResult<()> {
459        let (iter, key, value) = self.iter.as_mut().expect("should be valid");
460        if let Some((new_key, new_value)) = iter.next() {
461            *key = new_key;
462            *value = new_value;
463        } else {
464            self.iter = None;
465        }
466        Ok(())
467    }
468
469    fn key(&self) -> FullKey<&[u8]> {
470        let (_, key, _) = self.iter.as_ref().expect("should be valid");
471        FullKey {
472            epoch_with_gap: self.epoch,
473            user_key: UserKey {
474                table_id: self.table_id,
475                table_key: *key,
476            },
477        }
478    }
479
480    fn value(&self) -> HummockValue<&[u8]> {
481        let (_, _, value) = self.iter.as_ref().expect("should be valid");
482        *value
483    }
484
485    fn is_valid(&self) -> bool {
486        self.iter.is_some()
487    }
488
489    async fn rewind(&mut self) -> HummockResult<()> {
490        let mut iter = B::rewind(self.inner);
491        if let Some((key, value)) = iter.next() {
492            self.iter = Some((RustIteratorOfBuilder::Rewind(iter), key, value));
493        } else {
494            self.iter = None;
495        }
496        Ok(())
497    }
498
499    async fn seek<'b>(&'b mut self, key: FullKey<&'b [u8]>) -> HummockResult<()> {
500        match Self::Direction::direction() {
501            DirectionEnum::Forward => self.seek_inner(key).await,
502            DirectionEnum::Backward => self.rev_seek_inner(key).await,
503        }
504    }
505
506    fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}
507
508    fn value_meta(&self) -> ValueMeta {
509        ValueMeta::default()
510    }
511}
512
513#[derive(PartialEq, Eq, Debug)]
514pub enum DirectionEnum {
515    Forward,
516    Backward,
517}
518
519pub trait HummockIteratorDirection: Sync + Send + 'static {
520    fn direction() -> DirectionEnum;
521}
522
523pub struct Forward;
524impl HummockIteratorDirection for Forward {
525    #[inline(always)]
526    fn direction() -> DirectionEnum {
527        DirectionEnum::Forward
528    }
529}
530
531pub struct Backward;
532impl HummockIteratorDirection for Backward {
533    #[inline(always)]
534    fn direction() -> DirectionEnum {
535        DirectionEnum::Backward
536    }
537}
538
539pub trait IteratorFactory {
540    type Direction: HummockIteratorDirection;
541    type SstableIteratorType: SstableIteratorType<Direction = Self::Direction>;
542    fn add_batch_iter(&mut self, batch: SharedBufferBatch);
543    fn add_staging_sst_iter(&mut self, sst: Self::SstableIteratorType);
544    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType);
545    fn add_concat_sst_iter(
546        &mut self,
547        sstable_table_infos: Vec<SstableInfo>,
548        sstable_store: SstableStoreRef,
549        read_options: Arc<SstableIteratorReadOptions>,
550    );
551}
552
553/// This trait is used to maintain the state of whether the watermark has been skipped.
554pub trait SkipWatermarkState: Send {
555    /// Returns whether there are any unused watermarks in the state.
556    fn has_watermark(&self) -> bool;
557    /// Returns whether the incoming key needs to be deleted after watermark filtering.
558    /// Note: Each `table_id` has multiple `watermarks`, and state defaults to forward traversal of `vnodes`, so you must use forward traversal of the incoming key for it to be filtered correctly.
559    fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool;
560    /// Resets the watermark state.
561    fn reset_watermark(&mut self);
562    /// Determines if the current `watermark` is exhausted by the passed-in key, advances to the next `watermark`, and returns whether the key needs to be deleted.
563    fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool;
564}