risingwave_storage/hummock/iterator/
merge_inner.rs1use std::collections::binary_heap::PeekMut;
16use std::collections::{BinaryHeap, LinkedList};
17use std::ops::{Deref, DerefMut};
18
19use futures::FutureExt;
20use risingwave_hummock_sdk::key::FullKey;
21
22use super::Forward;
23use crate::hummock::HummockResult;
24use crate::hummock::iterator::{
25 DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta,
26};
27use crate::hummock::shared_buffer::shared_buffer_batch::{
28 SharedBufferBatchIterator, SharedBufferVersionedEntryRef,
29};
30use crate::hummock::value::HummockValue;
31use crate::monitor::StoreLocalStatistic;
32
33pub struct Node<I: HummockIterator> {
34 iter: I,
35}
36
37impl<I: HummockIterator> Eq for Node<I> where Self: PartialEq {}
38impl<I: HummockIterator> PartialOrd for Node<I>
39where
40 Self: Ord,
41{
42 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
43 Some(self.cmp(other))
44 }
45}
46
47impl<I: HummockIterator> Ord for Node<I> {
49 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
50 match I::Direction::direction() {
54 DirectionEnum::Forward => other.iter.key().cmp(&self.iter.key()),
55 DirectionEnum::Backward => self.iter.key().cmp(&other.iter.key()),
56 }
57 }
58}
59
60impl<I: HummockIterator> PartialEq for Node<I> {
61 fn eq(&self, other: &Self) -> bool {
62 self.iter.key() == other.iter.key()
63 }
64}
65
66pub struct MergeIterator<I: HummockIterator> {
68 unused_iters: LinkedList<Node<I>>,
70
71 heap: BinaryHeap<Node<I>>,
73}
74
75impl<I: HummockIterator> MergeIterator<I> {
76 fn collect_local_statistic_impl(&self, stats: &mut StoreLocalStatistic) {
77 for node in &self.heap {
78 node.iter.collect_local_statistic(stats);
79 }
80 for node in &self.unused_iters {
81 node.iter.collect_local_statistic(stats);
82 }
83 }
84}
85
86impl<I: HummockIterator> MergeIterator<I> {
87 pub fn new(iterators: impl IntoIterator<Item = I>) -> Self {
88 Self::create(iterators)
89 }
90
91 pub fn for_compactor(iterators: impl IntoIterator<Item = I>) -> Self {
92 Self::create(iterators)
93 }
94
95 fn create(iterators: impl IntoIterator<Item = I>) -> Self {
96 Self {
97 unused_iters: iterators.into_iter().map(|iter| Node { iter }).collect(),
98 heap: BinaryHeap::new(),
99 }
100 }
101}
102
103impl MergeIterator<SharedBufferBatchIterator<Forward>> {
104 pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
106 self.heap
107 .peek()
108 .expect("no inner iter for imm merge")
109 .iter
110 .current_key_entry()
111 }
112}
113
114impl<I: HummockIterator> MergeIterator<I>
115where
116 Node<I>: Ord,
117{
118 fn reset_heap(&mut self) {
120 self.unused_iters.extend(self.heap.drain());
121 }
122
123 fn build_heap(&mut self) {
126 assert!(self.heap.is_empty());
127
128 self.heap = self
129 .unused_iters
130 .extract_if(|i| i.iter.is_valid())
131 .collect();
132 }
133}
134
135struct PeekMutGuard<'a, T: Ord> {
150 peek: Option<PeekMut<'a, T>>,
151 unused: &'a mut LinkedList<T>,
152}
153
154impl<'a, T: Ord> PeekMutGuard<'a, T> {
155 fn peek_mut(heap: &'a mut BinaryHeap<T>, unused: &'a mut LinkedList<T>) -> Option<Self> {
158 heap.peek_mut().map(|peek| Self {
159 peek: Some(peek),
160 unused,
161 })
162 }
163
164 fn pop(mut self) -> T {
166 PeekMut::pop(self.peek.take().expect("should not be None"))
167 }
168
169 fn used(mut self) {
171 self.peek.take().expect("should not be None");
172 }
173}
174
175impl<T: Ord> Deref for PeekMutGuard<'_, T> {
176 type Target = T;
177
178 fn deref(&self) -> &Self::Target {
179 self.peek.as_ref().expect("should not be None")
180 }
181}
182
183impl<T: Ord> DerefMut for PeekMutGuard<'_, T> {
184 fn deref_mut(&mut self) -> &mut Self::Target {
185 self.peek.as_mut().expect("should not be None")
186 }
187}
188
189impl<T: Ord> Drop for PeekMutGuard<'_, T> {
190 fn drop(&mut self) {
193 if let Some(peek) = self.peek.take() {
194 tracing::debug!(
195 "PeekMut are dropped without used. May be caused by future cancellation"
196 );
197 let top = PeekMut::pop(peek);
198 self.unused.push_back(top);
199 }
200 }
201}
202
203impl MergeIterator<SharedBufferBatchIterator<Forward>> {
204 pub(crate) fn advance_peek_to_next_key(&mut self) {
205 let mut node =
206 PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter");
207
208 node.iter.advance_to_next_key();
209
210 if !node.iter.is_valid() {
211 let node = node.pop();
213 self.unused_iters.push_back(node);
214 } else {
215 node.used();
217 }
218 }
219
220 pub(crate) fn rewind_no_await(&mut self) {
221 self.rewind()
222 .now_or_never()
223 .expect("should not pending")
224 .expect("should not err")
225 }
226}
227
228impl<I: HummockIterator> HummockIterator for MergeIterator<I>
229where
230 Node<I>: Ord,
231{
232 type Direction = I::Direction;
233
234 async fn next(&mut self) -> HummockResult<()> {
235 let mut node =
236 PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter");
237
238 match node.iter.next().await {
243 Ok(_) => {}
244 Err(e) => {
245 node.pop();
248 self.heap.clear();
249 return Err(e);
250 }
251 }
252
253 if !node.iter.is_valid() {
254 let node = node.pop();
256 self.unused_iters.push_back(node);
257 } else {
258 node.used();
260 }
261
262 Ok(())
263 }
264
265 fn key(&self) -> FullKey<&[u8]> {
266 self.heap.peek().expect("no inner iter").iter.key()
267 }
268
269 fn value(&self) -> HummockValue<&[u8]> {
270 self.heap.peek().expect("no inner iter").iter.value()
271 }
272
273 fn is_valid(&self) -> bool {
274 self.heap.peek().is_some_and(|n| n.iter.is_valid())
275 }
276
277 async fn rewind(&mut self) -> HummockResult<()> {
278 self.reset_heap();
279 futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.rewind()))
280 .await?;
281 self.build_heap();
282 Ok(())
283 }
284
285 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
286 self.reset_heap();
287 futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.seek(key)))
288 .await?;
289 self.build_heap();
290 Ok(())
291 }
292
293 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
294 self.collect_local_statistic_impl(stats);
295 }
296
297 fn value_meta(&self) -> ValueMeta {
298 self.heap.peek().expect("no inner iter").iter.value_meta()
299 }
300}