risingwave_storage/hummock/iterator/
merge_inner.rs1use std::collections::binary_heap::PeekMut;
16use std::collections::{BinaryHeap, LinkedList};
17use std::ops::{Deref, DerefMut};
18
19use risingwave_hummock_sdk::key::FullKey;
20
21use crate::hummock::HummockResult;
22use crate::hummock::iterator::{
23 DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta,
24};
25use crate::hummock::value::HummockValue;
26use crate::monitor::StoreLocalStatistic;
27
28pub struct Node<I: HummockIterator> {
29 iter: I,
30}
31
32impl<I: HummockIterator> Eq for Node<I> where Self: PartialEq {}
33impl<I: HummockIterator> PartialOrd for Node<I>
34where
35 Self: Ord,
36{
37 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38 Some(self.cmp(other))
39 }
40}
41
42impl<I: HummockIterator> Ord for Node<I> {
44 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
45 match I::Direction::direction() {
49 DirectionEnum::Forward => other.iter.key().cmp(&self.iter.key()),
50 DirectionEnum::Backward => self.iter.key().cmp(&other.iter.key()),
51 }
52 }
53}
54
55impl<I: HummockIterator> PartialEq for Node<I> {
56 fn eq(&self, other: &Self) -> bool {
57 self.iter.key() == other.iter.key()
58 }
59}
60
61pub struct MergeIterator<I: HummockIterator> {
63 unused_iters: LinkedList<Node<I>>,
65
66 heap: BinaryHeap<Node<I>>,
68}
69
70impl<I: HummockIterator> MergeIterator<I> {
71 fn collect_local_statistic_impl(&self, stats: &mut StoreLocalStatistic) {
72 for node in &self.heap {
73 node.iter.collect_local_statistic(stats);
74 }
75 for node in &self.unused_iters {
76 node.iter.collect_local_statistic(stats);
77 }
78 }
79}
80
81impl<I: HummockIterator> MergeIterator<I> {
82 pub fn new(iterators: impl IntoIterator<Item = I>) -> Self {
83 Self::create(iterators)
84 }
85
86 pub fn for_compactor(iterators: impl IntoIterator<Item = I>) -> Self {
87 Self::create(iterators)
88 }
89
90 fn create(iterators: impl IntoIterator<Item = I>) -> Self {
91 Self {
92 unused_iters: iterators.into_iter().map(|iter| Node { iter }).collect(),
93 heap: BinaryHeap::new(),
94 }
95 }
96}
97
98impl<I: HummockIterator> MergeIterator<I>
99where
100 Node<I>: Ord,
101{
102 fn reset_heap(&mut self) {
104 self.unused_iters.extend(self.heap.drain());
105 }
106
107 fn build_heap(&mut self) {
110 assert!(self.heap.is_empty());
111
112 self.heap = self
113 .unused_iters
114 .extract_if(|i| i.iter.is_valid())
115 .collect();
116 }
117}
118
119struct PeekMutGuard<'a, T: Ord> {
134 peek: Option<PeekMut<'a, T>>,
135 unused: &'a mut LinkedList<T>,
136}
137
138impl<'a, T: Ord> PeekMutGuard<'a, T> {
139 fn peek_mut(heap: &'a mut BinaryHeap<T>, unused: &'a mut LinkedList<T>) -> Option<Self> {
142 heap.peek_mut().map(|peek| Self {
143 peek: Some(peek),
144 unused,
145 })
146 }
147
148 fn pop(mut self) -> T {
150 PeekMut::pop(self.peek.take().expect("should not be None"))
151 }
152
153 fn used(mut self) {
155 self.peek.take().expect("should not be None");
156 }
157}
158
159impl<T: Ord> Deref for PeekMutGuard<'_, T> {
160 type Target = T;
161
162 fn deref(&self) -> &Self::Target {
163 self.peek.as_ref().expect("should not be None")
164 }
165}
166
167impl<T: Ord> DerefMut for PeekMutGuard<'_, T> {
168 fn deref_mut(&mut self) -> &mut Self::Target {
169 self.peek.as_mut().expect("should not be None")
170 }
171}
172
173impl<T: Ord> Drop for PeekMutGuard<'_, T> {
174 fn drop(&mut self) {
177 if let Some(peek) = self.peek.take() {
178 tracing::debug!(
179 "PeekMut are dropped without used. May be caused by future cancellation"
180 );
181 let top = PeekMut::pop(peek);
182 self.unused.push_back(top);
183 }
184 }
185}
186
187impl<I: HummockIterator> HummockIterator for MergeIterator<I>
188where
189 Node<I>: Ord,
190{
191 type Direction = I::Direction;
192
193 async fn next(&mut self) -> HummockResult<()> {
194 let mut node =
195 PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter");
196
197 match node.iter.next().await {
202 Ok(_) => {}
203 Err(e) => {
204 node.pop();
207 self.heap.clear();
208 return Err(e);
209 }
210 }
211
212 if !node.iter.is_valid() {
213 let node = node.pop();
215 self.unused_iters.push_back(node);
216 } else {
217 node.used();
219 }
220
221 Ok(())
222 }
223
224 fn key(&self) -> FullKey<&[u8]> {
225 self.heap.peek().expect("no inner iter").iter.key()
226 }
227
228 fn value(&self) -> HummockValue<&[u8]> {
229 self.heap.peek().expect("no inner iter").iter.value()
230 }
231
232 fn is_valid(&self) -> bool {
233 self.heap.peek().is_some_and(|n| n.iter.is_valid())
234 }
235
236 async fn rewind(&mut self) -> HummockResult<()> {
237 self.reset_heap();
238 futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.rewind()))
239 .await?;
240 self.build_heap();
241 Ok(())
242 }
243
244 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
245 self.reset_heap();
246 futures::future::try_join_all(self.unused_iters.iter_mut().map(|x| x.iter.seek(key)))
247 .await?;
248 self.build_heap();
249 Ok(())
250 }
251
252 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
253 self.collect_local_statistic_impl(stats);
254 }
255
256 fn value_meta(&self) -> ValueMeta {
257 self.heap.peek().expect("no inner iter").iter.value_meta()
258 }
259}