risingwave_storage/hummock/iterator/
mod.rs1use std::future::Future;
16use std::marker::PhantomData;
17use std::ops::{Deref, DerefMut};
18use std::sync::Arc;
19
20use more_asserts::{assert_gt, assert_lt};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22
23use super::{
24 HummockResult, HummockValue, SstableIteratorReadOptions, SstableIteratorType, SstableStoreRef,
25};
26
27mod forward_concat;
28pub use forward_concat::*;
29mod backward_concat;
30mod concat_inner;
31pub use backward_concat::*;
32pub use concat_inner::ConcatIteratorInner;
33mod backward_merge;
34
35mod backward_user;
36pub use backward_user::*;
37mod forward_merge;
38
39pub mod forward_user;
40mod merge_inner;
41pub use forward_user::*;
42pub use merge_inner::MergeIterator;
43use risingwave_hummock_sdk::EpochWithGap;
44use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
45
46use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third};
47
48pub mod change_log;
49mod skip_watermark;
50#[cfg(any(test, feature = "test"))]
51pub mod test_utils;
52
53use risingwave_common::catalog::TableId;
54pub use skip_watermark::*;
55
56use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
57use crate::monitor::StoreLocalStatistic;
58
59#[derive(Default)]
60pub struct ValueMeta {
61 pub object_id: Option<u64>,
62 pub block_id: Option<u64>,
63}
64
65pub trait HummockIterator: Send {
72 type Direction: HummockIteratorDirection;
73 fn next(&mut self) -> impl Future<Output = HummockResult<()>> + Send + '_;
85
86 fn key(&self) -> FullKey<&[u8]>;
95
96 fn value(&self) -> HummockValue<&[u8]>;
106
107 fn is_valid(&self) -> bool;
113
114 fn rewind(&mut self) -> impl Future<Output = HummockResult<()>> + Send + '_;
121
122 fn seek<'a>(
130 &'a mut self,
131 key: FullKey<&'a [u8]>,
132 ) -> impl Future<Output = HummockResult<()>> + Send;
133
134 fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic);
136
137 fn value_meta(&self) -> ValueMeta;
139}
140
141pub struct PhantomHummockIterator<D: HummockIteratorDirection> {
143 _phantom: PhantomData<D>,
144}
145
146impl<D: HummockIteratorDirection> HummockIterator for PhantomHummockIterator<D> {
147 type Direction = D;
148
149 async fn next(&mut self) -> HummockResult<()> {
150 unreachable!()
151 }
152
153 fn key(&self) -> FullKey<&[u8]> {
154 unreachable!()
155 }
156
157 fn value(&self) -> HummockValue<&[u8]> {
158 unreachable!()
159 }
160
161 fn is_valid(&self) -> bool {
162 unreachable!()
163 }
164
165 async fn rewind(&mut self) -> HummockResult<()> {
166 unreachable!()
167 }
168
169 async fn seek<'a>(&'a mut self, _key: FullKey<&'a [u8]>) -> HummockResult<()> {
170 unreachable!()
171 }
172
173 fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}
174
175 fn value_meta(&self) -> ValueMeta {
176 unreachable!()
177 }
178}
179
180pub enum HummockIteratorUnion<
193 D: HummockIteratorDirection,
194 I1: HummockIterator<Direction = D>,
195 I2: HummockIterator<Direction = D>,
196 I3: HummockIterator<Direction = D> = PhantomHummockIterator<D>,
197 I4: HummockIterator<Direction = D> = PhantomHummockIterator<D>,
198> {
199 First(I1),
200 Second(I2),
201 Third(I3),
202 Fourth(I4),
203}
204
205impl<
206 D: HummockIteratorDirection,
207 I1: HummockIterator<Direction = D>,
208 I2: HummockIterator<Direction = D>,
209 I3: HummockIterator<Direction = D>,
210 I4: HummockIterator<Direction = D>,
211> HummockIterator for HummockIteratorUnion<D, I1, I2, I3, I4>
212{
213 type Direction = D;
214
215 async fn next(&mut self) -> HummockResult<()> {
216 match self {
217 First(iter) => iter.next().await,
218 Second(iter) => iter.next().await,
219 Third(iter) => iter.next().await,
220 Fourth(iter) => iter.next().await,
221 }
222 }
223
224 fn key(&self) -> FullKey<&[u8]> {
225 match self {
226 First(iter) => iter.key(),
227 Second(iter) => iter.key(),
228 Third(iter) => iter.key(),
229 Fourth(iter) => iter.key(),
230 }
231 }
232
233 fn value(&self) -> HummockValue<&[u8]> {
234 match self {
235 First(iter) => iter.value(),
236 Second(iter) => iter.value(),
237 Third(iter) => iter.value(),
238 Fourth(iter) => iter.value(),
239 }
240 }
241
242 fn is_valid(&self) -> bool {
243 match self {
244 First(iter) => iter.is_valid(),
245 Second(iter) => iter.is_valid(),
246 Third(iter) => iter.is_valid(),
247 Fourth(iter) => iter.is_valid(),
248 }
249 }
250
251 async fn rewind(&mut self) -> HummockResult<()> {
252 match self {
253 First(iter) => iter.rewind().await,
254 Second(iter) => iter.rewind().await,
255 Third(iter) => iter.rewind().await,
256 Fourth(iter) => iter.rewind().await,
257 }
258 }
259
260 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
261 match self {
262 First(iter) => iter.seek(key).await,
263 Second(iter) => iter.seek(key).await,
264 Third(iter) => iter.seek(key).await,
265 Fourth(iter) => iter.seek(key).await,
266 }
267 }
268
269 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
270 match self {
271 First(iter) => iter.collect_local_statistic(stats),
272 Second(iter) => iter.collect_local_statistic(stats),
273 Third(iter) => iter.collect_local_statistic(stats),
274 Fourth(iter) => iter.collect_local_statistic(stats),
275 }
276 }
277
278 fn value_meta(&self) -> ValueMeta {
279 match self {
280 First(iter) => iter.value_meta(),
281 Second(iter) => iter.value_meta(),
282 Third(iter) => iter.value_meta(),
283 Fourth(iter) => iter.value_meta(),
284 }
285 }
286}
287
288impl<I: HummockIterator> HummockIterator for Box<I> {
289 type Direction = I::Direction;
290
291 async fn next(&mut self) -> HummockResult<()> {
292 (*self).deref_mut().next().await
293 }
294
295 fn key(&self) -> FullKey<&[u8]> {
296 (*self).deref().key()
297 }
298
299 fn value(&self) -> HummockValue<&[u8]> {
300 (*self).deref().value()
301 }
302
303 fn is_valid(&self) -> bool {
304 (*self).deref().is_valid()
305 }
306
307 async fn rewind(&mut self) -> HummockResult<()> {
308 (*self).deref_mut().rewind().await
309 }
310
311 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
312 (*self).deref_mut().seek(key).await
313 }
314
315 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
316 (*self).deref().collect_local_statistic(stats);
317 }
318
319 fn value_meta(&self) -> ValueMeta {
320 (*self).deref().value_meta()
321 }
322}
323
324pub enum RustIteratorOfBuilder<'a, B: RustIteratorBuilder> {
325 Seek(B::SeekIter<'a>),
326 Rewind(B::RewindIter<'a>),
327}
328
329impl<'a, B: RustIteratorBuilder> Iterator for RustIteratorOfBuilder<'a, B> {
330 type Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>);
331
332 fn next(&mut self) -> Option<Self::Item> {
333 match self {
334 RustIteratorOfBuilder::Seek(i) => i.next(),
335 RustIteratorOfBuilder::Rewind(i) => i.next(),
336 }
337 }
338}
339
340pub trait RustIteratorBuilder: Send + Sync + 'static {
341 type Iterable: Send + Sync;
342 type Direction: HummockIteratorDirection;
343 type RewindIter<'a>: Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)>
344 + Send
345 + Sync
346 + 'a;
347 type SeekIter<'a>: Iterator<Item = (TableKey<&'a [u8]>, HummockValue<&'a [u8]>)>
348 + Send
349 + Sync
350 + 'a;
351
352 fn seek<'a>(iterable: &'a Self::Iterable, seek_key: TableKey<&[u8]>) -> Self::SeekIter<'a>;
353 fn rewind(iterable: &Self::Iterable) -> Self::RewindIter<'_>;
354}
355
356pub struct FromRustIterator<'a, B: RustIteratorBuilder> {
357 inner: &'a B::Iterable,
358 #[expect(clippy::type_complexity)]
359 iter: Option<(
360 RustIteratorOfBuilder<'a, B>,
361 TableKey<&'a [u8]>,
362 HummockValue<&'a [u8]>,
363 )>,
364 epoch: EpochWithGap,
365 table_id: TableId,
366}
367
368impl<'a, B: RustIteratorBuilder> FromRustIterator<'a, B> {
369 pub fn new(inner: &'a B::Iterable, epoch: EpochWithGap, table_id: TableId) -> Self {
370 Self {
371 inner,
372 iter: None,
373 epoch,
374 table_id,
375 }
376 }
377
378 async fn seek_inner<'b>(&'b mut self, key: FullKey<&'b [u8]>) -> HummockResult<()> {
379 match self.table_id.cmp(&key.user_key.table_id) {
380 std::cmp::Ordering::Less => {
381 self.iter = None;
382 return Ok(());
383 }
384 std::cmp::Ordering::Greater => {
385 return self.rewind().await;
386 }
387 _ => {}
388 }
389 let mut iter = B::seek(self.inner, key.user_key.table_key);
390 match iter.next() {
391 Some((first_key, first_value)) => {
392 if first_key.eq(&key.user_key.table_key) && self.epoch > key.epoch_with_gap {
393 match iter.next() {
396 Some((next_key, next_value)) => {
397 assert_gt!(next_key, first_key);
398 self.iter =
399 Some((RustIteratorOfBuilder::Seek(iter), next_key, next_value));
400 }
401 None => {
402 self.iter = None;
403 }
404 }
405 } else {
406 self.iter = Some((RustIteratorOfBuilder::Seek(iter), first_key, first_value));
407 }
408 }
409 None => {
410 self.iter = None;
411 }
412 }
413 Ok(())
414 }
415
416 async fn rev_seek_inner<'b>(&'b mut self, key: FullKey<&'b [u8]>) -> HummockResult<()> {
417 match self.table_id.cmp(&key.user_key.table_id) {
418 std::cmp::Ordering::Less => {
419 return self.rewind().await;
420 }
421 std::cmp::Ordering::Greater => {
422 self.iter = None;
423 return Ok(());
424 }
425 _ => {}
426 }
427 let mut iter = B::seek(self.inner, key.user_key.table_key);
428 match iter.next() {
429 Some((first_key, first_value)) => {
430 if first_key.eq(&key.user_key.table_key) && self.epoch < key.epoch_with_gap {
431 match iter.next() {
434 Some((next_key, next_value)) => {
435 assert_lt!(next_key, first_key);
436 self.iter =
437 Some((RustIteratorOfBuilder::Seek(iter), next_key, next_value));
438 }
439 None => {
440 self.iter = None;
441 }
442 }
443 } else {
444 self.iter = Some((RustIteratorOfBuilder::Seek(iter), first_key, first_value));
445 }
446 }
447 None => {
448 self.iter = None;
449 }
450 }
451 Ok(())
452 }
453}
454
455impl<B: RustIteratorBuilder> HummockIterator for FromRustIterator<'_, B> {
456 type Direction = B::Direction;
457
458 async fn next(&mut self) -> HummockResult<()> {
459 let (iter, key, value) = self.iter.as_mut().expect("should be valid");
460 if let Some((new_key, new_value)) = iter.next() {
461 *key = new_key;
462 *value = new_value;
463 } else {
464 self.iter = None;
465 }
466 Ok(())
467 }
468
469 fn key(&self) -> FullKey<&[u8]> {
470 let (_, key, _) = self.iter.as_ref().expect("should be valid");
471 FullKey {
472 epoch_with_gap: self.epoch,
473 user_key: UserKey {
474 table_id: self.table_id,
475 table_key: *key,
476 },
477 }
478 }
479
480 fn value(&self) -> HummockValue<&[u8]> {
481 let (_, _, value) = self.iter.as_ref().expect("should be valid");
482 *value
483 }
484
485 fn is_valid(&self) -> bool {
486 self.iter.is_some()
487 }
488
489 async fn rewind(&mut self) -> HummockResult<()> {
490 let mut iter = B::rewind(self.inner);
491 if let Some((key, value)) = iter.next() {
492 self.iter = Some((RustIteratorOfBuilder::Rewind(iter), key, value));
493 } else {
494 self.iter = None;
495 }
496 Ok(())
497 }
498
499 async fn seek<'b>(&'b mut self, key: FullKey<&'b [u8]>) -> HummockResult<()> {
500 match Self::Direction::direction() {
501 DirectionEnum::Forward => self.seek_inner(key).await,
502 DirectionEnum::Backward => self.rev_seek_inner(key).await,
503 }
504 }
505
506 fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}
507
508 fn value_meta(&self) -> ValueMeta {
509 ValueMeta::default()
510 }
511}
512
513#[derive(PartialEq, Eq, Debug)]
514pub enum DirectionEnum {
515 Forward,
516 Backward,
517}
518
519pub trait HummockIteratorDirection: Sync + Send + 'static {
520 fn direction() -> DirectionEnum;
521}
522
523pub struct Forward;
524impl HummockIteratorDirection for Forward {
525 #[inline(always)]
526 fn direction() -> DirectionEnum {
527 DirectionEnum::Forward
528 }
529}
530
531pub struct Backward;
532impl HummockIteratorDirection for Backward {
533 #[inline(always)]
534 fn direction() -> DirectionEnum {
535 DirectionEnum::Backward
536 }
537}
538
539pub trait IteratorFactory {
540 type Direction: HummockIteratorDirection;
541 type SstableIteratorType: SstableIteratorType<Direction = Self::Direction>;
542 fn add_batch_iter(&mut self, batch: SharedBufferBatch);
543 fn add_staging_sst_iter(&mut self, sst: Self::SstableIteratorType);
544 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType);
545 fn add_concat_sst_iter(
546 &mut self,
547 sstable_table_infos: Vec<SstableInfo>,
548 sstable_store: SstableStoreRef,
549 read_options: Arc<SstableIteratorReadOptions>,
550 );
551}
552
553pub trait SkipWatermarkState: Send {
555 fn has_watermark(&self) -> bool;
557 fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool;
560 fn reset_watermark(&mut self);
562 fn advance_watermark(&mut self, key: &FullKey<&[u8]>) -> bool;
564}