risingwave_storage/hummock/iterator/
merge_inner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::binary_heap::PeekMut;
16use std::collections::{BinaryHeap, LinkedList};
17use std::ops::{Deref, DerefMut};
18
19use futures::FutureExt;
20use risingwave_hummock_sdk::key::FullKey;
21
22use super::Forward;
23use crate::hummock::HummockResult;
24use crate::hummock::iterator::{
25    DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta,
26};
27use crate::hummock::shared_buffer::shared_buffer_batch::{
28    SharedBufferBatchIterator, SharedBufferVersionedEntryRef,
29};
30use crate::hummock::value::HummockValue;
31use crate::monitor::StoreLocalStatistic;
32
33pub struct Node<I: HummockIterator> {
34    iter: I,
35}
36
37impl<I: HummockIterator> Eq for Node<I> where Self: PartialEq {}
38impl<I: HummockIterator> PartialOrd for Node<I>
39where
40    Self: Ord,
41{
42    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
43        Some(self.cmp(other))
44    }
45}
46
47/// Implement `Ord` for unordered iter node. Only compare the key.
48impl<I: HummockIterator> Ord for Node<I> {
49    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
50        // Note: to implement min-heap by using max-heap internally, the comparing
51        // order should be reversed.
52
53        match I::Direction::direction() {
54            DirectionEnum::Forward => other.iter.key().cmp(&self.iter.key()),
55            DirectionEnum::Backward => self.iter.key().cmp(&other.iter.key()),
56        }
57    }
58}
59
60impl<I: HummockIterator> PartialEq for Node<I> {
61    fn eq(&self, other: &Self) -> bool {
62        self.iter.key() == other.iter.key()
63    }
64}
65
66/// Iterates on multiple iterators, a.k.a. `MergeIterator`.
67pub struct MergeIterator<I: HummockIterator> {
68    /// Invalid or non-initialized iterators.
69    unused_iters: LinkedList<Node<I>>,
70
71    /// The heap for merge sort.
72    heap: BinaryHeap<Node<I>>,
73}
74
75impl<I: HummockIterator> MergeIterator<I> {
76    fn collect_local_statistic_impl(&self, stats: &mut StoreLocalStatistic) {
77        for node in &self.heap {
78            node.iter.collect_local_statistic(stats);
79        }
80        for node in &self.unused_iters {
81            node.iter.collect_local_statistic(stats);
82        }
83    }
84}
85
86impl<I: HummockIterator> MergeIterator<I> {
87    pub fn new(iterators: impl IntoIterator<Item = I>) -> Self {
88        Self::create(iterators)
89    }
90
91    pub fn for_compactor(iterators: impl IntoIterator<Item = I>) -> Self {
92        Self::create(iterators)
93    }
94
95    fn create(iterators: impl IntoIterator<Item = I>) -> Self {
96        Self {
97            unused_iters: iterators.into_iter().map(|iter| Node { iter }).collect(),
98            heap: BinaryHeap::new(),
99        }
100    }
101}
102
103impl MergeIterator<SharedBufferBatchIterator<Forward>> {
104    /// Used in `merge_imms_in_memory` to merge immutable memtables.
105    pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
106        self.heap
107            .peek()
108            .expect("no inner iter for imm merge")
109            .iter
110            .current_key_entry()
111    }
112}
113
114impl<I: HummockIterator> MergeIterator<I>
115where
116    Node<I>: Ord,
117{
118    /// Moves all iterators from the `heap` to the linked list.
119    fn reset_heap(&mut self) {
120        self.unused_iters.extend(self.heap.drain());
121    }
122
123    /// After some iterators in `unused_iterators` are sought or rewound, calls this function
124    /// to construct a new heap using the valid ones.
125    fn build_heap(&mut self) {
126        assert!(self.heap.is_empty());
127
128        self.heap = self
129            .unused_iters
130            .extract_if(|i| i.iter.is_valid())
131            .collect();
132    }
133}
134
135/// This is a wrapper for the `PeekMut` of heap.
136///
137/// Several panics due to future cancellation are caused by calling `drop` on the `PeekMut` when
138/// futures holding the `PeekMut` are cancelled and dropped. Dropping a `PeekMut` will accidentally
139/// cause a comparison between the top node and the node below, and may call `key()` for top node
140/// iterators that are in some intermediate inconsistent states.
141///
142/// When a `PeekMut` is wrapped by this guard, when the guard is dropped, `PeekMut::pop` will be
143/// called on the `PeekMut`, and the popped node will be added to the linked list that collects the
144/// unused nodes. In this way, when the future holding the guard is dropped, the `PeekMut` will not
145/// be called `drop`, and there will not be unexpected `key()` called for heap comparison.
146///
147/// In normal usage, when we finished using the `PeekMut`, we should explicitly call `guard.used()`
148/// in every branch carefully. When we want to pop the `PeekMut`, we can simply call `guard.pop()`.
149struct PeekMutGuard<'a, T: Ord> {
150    peek: Option<PeekMut<'a, T>>,
151    unused: &'a mut LinkedList<T>,
152}
153
154impl<'a, T: Ord> PeekMutGuard<'a, T> {
155    /// Call `peek_mut` on the top of heap and return a guard over the `PeekMut` if the heap is not
156    /// empty.
157    fn peek_mut(heap: &'a mut BinaryHeap<T>, unused: &'a mut LinkedList<T>) -> Option<Self> {
158        heap.peek_mut().map(|peek| Self {
159            peek: Some(peek),
160            unused,
161        })
162    }
163
164    /// Call `pop` on the `PeekMut`.
165    fn pop(mut self) -> T {
166        PeekMut::pop(self.peek.take().expect("should not be None"))
167    }
168
169    /// Mark finish using the `PeekMut`. `drop` will be called on the `PeekMut` directly.
170    fn used(mut self) {
171        self.peek.take().expect("should not be None");
172    }
173}
174
175impl<T: Ord> Deref for PeekMutGuard<'_, T> {
176    type Target = T;
177
178    fn deref(&self) -> &Self::Target {
179        self.peek.as_ref().expect("should not be None")
180    }
181}
182
183impl<T: Ord> DerefMut for PeekMutGuard<'_, T> {
184    fn deref_mut(&mut self) -> &mut Self::Target {
185        self.peek.as_mut().expect("should not be None")
186    }
187}
188
189impl<T: Ord> Drop for PeekMutGuard<'_, T> {
190    /// When the guard is dropped, if `pop` or `used` is not called before it is dropped, we will
191    /// call `PeekMut::pop` on the `PeekMut` and recycle the node to the unused list.
192    fn drop(&mut self) {
193        if let Some(peek) = self.peek.take() {
194            tracing::debug!(
195                "PeekMut are dropped without used. May be caused by future cancellation"
196            );
197            let top = PeekMut::pop(peek);
198            self.unused.push_back(top);
199        }
200    }
201}
202
203impl MergeIterator<SharedBufferBatchIterator<Forward>> {
204    pub(crate) fn advance_peek_to_next_key(&mut self) {
205        let mut node =
206            PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter");
207
208        node.iter.advance_to_next_key();
209
210        if !node.iter.is_valid() {
211            // Put back to `unused_iters`
212            let node = node.pop();
213            self.unused_iters.push_back(node);
214        } else {
215            // This will update the heap top.
216            node.used();
217        }
218    }
219
220    pub(crate) fn rewind_no_await(&mut self) {
221        self.rewind()
222            .now_or_never()
223            .expect("should not pending")
224            .expect("should not err")
225    }
226}
227
228impl<I: HummockIterator> HummockIterator for MergeIterator<I>
229where
230    Node<I>: Ord,
231{
232    type Direction = I::Direction;
233
234    async fn next(&mut self) -> HummockResult<()> {
235        let mut node =
236            PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter");
237
238        // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of
239        // return. Once the iterator enters an invalid state, we should remove it from heap
240        // before returning.
241
242        match node.iter.next().await {
243            Ok(_) => {}
244            Err(e) => {
245                // If the iterator returns error, we should clear the heap, so that this
246                // iterator becomes invalid.
247                node.pop();
248                self.heap.clear();
249                return Err(e);
250            }
251        }
252
253        if !node.iter.is_valid() {
254            // Put back to `unused_iters`
255            let node = node.pop();
256            self.unused_iters.push_back(node);
257        } else {
258            // This will update the heap top.
259            node.used();
260        }
261
262        Ok(())
263    }
264
265    fn key(&self) -> FullKey<&[u8]> {
266        self.heap.peek().expect("no inner iter").iter.key()
267    }
268
269    fn value(&self) -> HummockValue<&[u8]> {
270        self.heap.peek().expect("no inner iter").iter.value()
271    }
272
273    fn is_valid(&self) -> bool {
274        self.heap.peek().is_some_and(|n| n.iter.is_valid())
275    }
276
277    async fn rewind(&mut self) -> HummockResult<()> {
278        self.reset_heap();
279        futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.rewind()))
280            .await?;
281        self.build_heap();
282        Ok(())
283    }
284
285    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
286        self.reset_heap();
287        futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.seek(key)))
288            .await?;
289        self.build_heap();
290        Ok(())
291    }
292
293    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
294        self.collect_local_statistic_impl(stats);
295    }
296
297    fn value_meta(&self) -> ValueMeta {
298        self.heap.peek().expect("no inner iter").iter.value_meta()
299    }
300}