risingwave_storage/hummock/iterator/
merge_inner.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::binary_heap::PeekMut;
16use std::collections::{BinaryHeap, LinkedList};
17use std::ops::{Deref, DerefMut};
18
19use risingwave_hummock_sdk::key::FullKey;
20
21use crate::hummock::HummockResult;
22use crate::hummock::iterator::{
23    DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta,
24};
25use crate::hummock::value::HummockValue;
26use crate::monitor::StoreLocalStatistic;
27
28pub struct Node<I: HummockIterator> {
29    iter: I,
30}
31
32impl<I: HummockIterator> Eq for Node<I> where Self: PartialEq {}
33impl<I: HummockIterator> PartialOrd for Node<I>
34where
35    Self: Ord,
36{
37    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38        Some(self.cmp(other))
39    }
40}
41
42/// Implement `Ord` for unordered iter node. Only compare the key.
43impl<I: HummockIterator> Ord for Node<I> {
44    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
45        // Note: to implement min-heap by using max-heap internally, the comparing
46        // order should be reversed.
47
48        match I::Direction::direction() {
49            DirectionEnum::Forward => other.iter.key().cmp(&self.iter.key()),
50            DirectionEnum::Backward => self.iter.key().cmp(&other.iter.key()),
51        }
52    }
53}
54
55impl<I: HummockIterator> PartialEq for Node<I> {
56    fn eq(&self, other: &Self) -> bool {
57        self.iter.key() == other.iter.key()
58    }
59}
60
61/// Iterates on multiple iterators, a.k.a. `MergeIterator`.
62pub struct MergeIterator<I: HummockIterator> {
63    /// Invalid or non-initialized iterators.
64    unused_iters: LinkedList<Node<I>>,
65
66    /// The heap for merge sort.
67    heap: BinaryHeap<Node<I>>,
68}
69
70impl<I: HummockIterator> MergeIterator<I> {
71    fn collect_local_statistic_impl(&self, stats: &mut StoreLocalStatistic) {
72        for node in &self.heap {
73            node.iter.collect_local_statistic(stats);
74        }
75        for node in &self.unused_iters {
76            node.iter.collect_local_statistic(stats);
77        }
78    }
79}
80
81impl<I: HummockIterator> MergeIterator<I> {
82    pub fn new(iterators: impl IntoIterator<Item = I>) -> Self {
83        Self::create(iterators)
84    }
85
86    pub fn for_compactor(iterators: impl IntoIterator<Item = I>) -> Self {
87        Self::create(iterators)
88    }
89
90    fn create(iterators: impl IntoIterator<Item = I>) -> Self {
91        Self {
92            unused_iters: iterators.into_iter().map(|iter| Node { iter }).collect(),
93            heap: BinaryHeap::new(),
94        }
95    }
96}
97
98impl<I: HummockIterator> MergeIterator<I>
99where
100    Node<I>: Ord,
101{
102    /// Moves all iterators from the `heap` to the linked list.
103    fn reset_heap(&mut self) {
104        self.unused_iters.extend(self.heap.drain());
105    }
106
107    /// After some iterators in `unused_iterators` are sought or rewound, calls this function
108    /// to construct a new heap using the valid ones.
109    fn build_heap(&mut self) {
110        assert!(self.heap.is_empty());
111
112        self.heap = self
113            .unused_iters
114            .extract_if(|i| i.iter.is_valid())
115            .collect();
116    }
117}
118
119/// This is a wrapper for the `PeekMut` of heap.
120///
121/// Several panics due to future cancellation are caused by calling `drop` on the `PeekMut` when
122/// futures holding the `PeekMut` are cancelled and dropped. Dropping a `PeekMut` will accidentally
123/// cause a comparison between the top node and the node below, and may call `key()` for top node
124/// iterators that are in some intermediate inconsistent states.
125///
126/// When a `PeekMut` is wrapped by this guard, when the guard is dropped, `PeekMut::pop` will be
127/// called on the `PeekMut`, and the popped node will be added to the linked list that collects the
128/// unused nodes. In this way, when the future holding the guard is dropped, the `PeekMut` will not
129/// be called `drop`, and there will not be unexpected `key()` called for heap comparison.
130///
131/// In normal usage, when we finished using the `PeekMut`, we should explicitly call `guard.used()`
132/// in every branch carefully. When we want to pop the `PeekMut`, we can simply call `guard.pop()`.
133struct PeekMutGuard<'a, T: Ord> {
134    peek: Option<PeekMut<'a, T>>,
135    unused: &'a mut LinkedList<T>,
136}
137
138impl<'a, T: Ord> PeekMutGuard<'a, T> {
139    /// Call `peek_mut` on the top of heap and return a guard over the `PeekMut` if the heap is not
140    /// empty.
141    fn peek_mut(heap: &'a mut BinaryHeap<T>, unused: &'a mut LinkedList<T>) -> Option<Self> {
142        heap.peek_mut().map(|peek| Self {
143            peek: Some(peek),
144            unused,
145        })
146    }
147
148    /// Call `pop` on the `PeekMut`.
149    fn pop(mut self) -> T {
150        PeekMut::pop(self.peek.take().expect("should not be None"))
151    }
152
153    /// Mark finish using the `PeekMut`. `drop` will be called on the `PeekMut` directly.
154    fn used(mut self) {
155        self.peek.take().expect("should not be None");
156    }
157}
158
159impl<T: Ord> Deref for PeekMutGuard<'_, T> {
160    type Target = T;
161
162    fn deref(&self) -> &Self::Target {
163        self.peek.as_ref().expect("should not be None")
164    }
165}
166
167impl<T: Ord> DerefMut for PeekMutGuard<'_, T> {
168    fn deref_mut(&mut self) -> &mut Self::Target {
169        self.peek.as_mut().expect("should not be None")
170    }
171}
172
173impl<T: Ord> Drop for PeekMutGuard<'_, T> {
174    /// When the guard is dropped, if `pop` or `used` is not called before it is dropped, we will
175    /// call `PeekMut::pop` on the `PeekMut` and recycle the node to the unused list.
176    fn drop(&mut self) {
177        if let Some(peek) = self.peek.take() {
178            tracing::debug!(
179                "PeekMut are dropped without used. May be caused by future cancellation"
180            );
181            let top = PeekMut::pop(peek);
182            self.unused.push_back(top);
183        }
184    }
185}
186
187impl<I: HummockIterator> HummockIterator for MergeIterator<I>
188where
189    Node<I>: Ord,
190{
191    type Direction = I::Direction;
192
193    async fn next(&mut self) -> HummockResult<()> {
194        let mut node =
195            PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter");
196
197        // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places of
198        // return. Once the iterator enters an invalid state, we should remove it from heap
199        // before returning.
200
201        match node.iter.next().await {
202            Ok(_) => {}
203            Err(e) => {
204                // If the iterator returns error, we should clear the heap, so that this
205                // iterator becomes invalid.
206                node.pop();
207                self.heap.clear();
208                return Err(e);
209            }
210        }
211
212        if !node.iter.is_valid() {
213            // Put back to `unused_iters`
214            let node = node.pop();
215            self.unused_iters.push_back(node);
216        } else {
217            // This will update the heap top.
218            node.used();
219        }
220
221        Ok(())
222    }
223
224    fn key(&self) -> FullKey<&[u8]> {
225        self.heap.peek().expect("no inner iter").iter.key()
226    }
227
228    fn value(&self) -> HummockValue<&[u8]> {
229        self.heap.peek().expect("no inner iter").iter.value()
230    }
231
232    fn is_valid(&self) -> bool {
233        self.heap.peek().is_some_and(|n| n.iter.is_valid())
234    }
235
236    async fn rewind(&mut self) -> HummockResult<()> {
237        self.reset_heap();
238        futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.rewind()))
239            .await?;
240        self.build_heap();
241        Ok(())
242    }
243
244    async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
245        self.reset_heap();
246        futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.seek(key)))
247            .await?;
248        self.build_heap();
249        Ok(())
250    }
251
252    fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
253        self.collect_local_statistic_impl(stats);
254    }
255
256    fn value_meta(&self) -> ValueMeta {
257        self.heap.peek().expect("no inner iter").iter.value_meta()
258    }
259}