1use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28 FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::VersionUpdate;
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39 Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40 IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44 SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45 SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50 wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54 BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55 SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61pub struct LocalHummockStorage {
64 mem_table: MemTable,
65
66 spill_offset: u16,
67 epoch: Option<EpochPair>,
68
69 table_id: TableId,
70 op_consistency_level: OpConsistencyLevel,
71 table_option: TableOption,
72
73 instance_guard: LocalInstanceGuard,
74
75 read_version: HummockReadVersionRef,
77
78 is_replicated: bool,
89
90 upload_on_flush: bool,
92
93 event_sender: HummockEventSender,
95
96 memory_limiter: Arc<MemoryLimiter>,
97
98 hummock_version_reader: HummockVersionReader,
99
100 stats: Arc<HummockStateStoreMetrics>,
101
102 write_limiter: WriteLimiterRef,
103
104 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106 mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110 async fn get_flushed<O>(
111 hummock_version_reader: &HummockVersionReader,
112 read_version: &HummockReadVersionRef,
113 user_key: UserKey<Bytes>,
114 read_options: ReadOptions,
115 on_key_value_fn: impl crate::store::KeyValueFn<O>,
116 ) -> StorageResult<Option<O>> {
117 let table_key_range = (
118 Bound::Included(user_key.table_key.clone()),
119 Bound::Included(user_key.table_key.clone()),
120 );
121
122 let (table_key_range, read_snapshot) =
123 read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
124
125 if is_empty_key_range(&table_key_range) {
126 return Ok(None);
127 }
128
129 hummock_version_reader
130 .get(
131 user_key.table_key,
132 MAX_EPOCH,
133 user_key.table_id,
134 read_options,
135 read_snapshot,
136 on_key_value_fn,
137 )
138 .await
139 }
140
141 async fn iter_flushed(
142 &self,
143 table_key_range: TableKeyRange,
144 read_options: ReadOptions,
145 ) -> StorageResult<HummockStorageIterator> {
146 let (table_key_range, read_snapshot) = read_filter_for_version(
147 MAX_EPOCH,
148 self.table_id,
149 table_key_range,
150 &self.read_version,
151 )?;
152
153 let table_key_range = table_key_range;
154
155 self.hummock_version_reader
156 .iter(
157 table_key_range,
158 MAX_EPOCH,
159 self.table_id,
160 read_options,
161 read_snapshot,
162 )
163 .await
164 }
165
166 async fn rev_iter_flushed(
167 &self,
168 table_key_range: TableKeyRange,
169 read_options: ReadOptions,
170 ) -> StorageResult<HummockStorageRevIterator> {
171 let (table_key_range, read_snapshot) = read_filter_for_version(
172 MAX_EPOCH,
173 self.table_id,
174 table_key_range,
175 &self.read_version,
176 )?;
177
178 let table_key_range = table_key_range;
179
180 self.hummock_version_reader
181 .rev_iter(
182 table_key_range,
183 MAX_EPOCH,
184 self.table_id,
185 read_options,
186 read_snapshot,
187 None,
188 )
189 .await
190 }
191}
192
193impl LocalHummockStorage {
194 fn epoch(&self) -> u64 {
195 self.epoch.expect("should have set the epoch").curr
196 }
197
198 fn current_epoch_with_gap(&self) -> EpochWithGap {
199 EpochWithGap::new(self.epoch(), self.spill_offset)
200 }
201
202 fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
203 MemTableHummockIterator::new(
204 &self.mem_table.buffer,
205 self.current_epoch_with_gap(),
206 self.table_id,
207 )
208 }
209
210 fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
211 MemTableHummockRevIterator::new(
212 &self.mem_table.buffer,
213 self.current_epoch_with_gap(),
214 self.table_id,
215 )
216 }
217
218 async fn iter_all(
219 &self,
220 table_key_range: TableKeyRange,
221 epoch: u64,
222 read_options: ReadOptions,
223 ) -> StorageResult<LocalHummockStorageIterator<'_>> {
224 let (table_key_range, read_snapshot) =
225 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
226
227 self.hummock_version_reader
228 .iter_with_memtable(
229 table_key_range,
230 epoch,
231 self.table_id,
232 read_options,
233 read_snapshot,
234 Some(self.mem_table_iter()),
235 )
236 .await
237 }
238
239 async fn rev_iter_all(
240 &self,
241 table_key_range: TableKeyRange,
242 epoch: u64,
243 read_options: ReadOptions,
244 ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245 let (table_key_range, read_snapshot) =
246 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248 self.hummock_version_reader
249 .rev_iter(
250 table_key_range,
251 epoch,
252 self.table_id,
253 read_options,
254 read_snapshot,
255 Some(self.mem_table_rev_iter()),
256 )
257 .await
258 }
259}
260
261#[derive(Clone)]
262pub struct LocalHummockFlushedSnapshotReader {
263 table_id: TableId,
264 read_version: HummockReadVersionRef,
265 hummock_version_reader: HummockVersionReader,
266}
267
268impl StateStoreGet for LocalHummockFlushedSnapshotReader {
269 async fn on_key_value<O: Send + 'static>(
270 &self,
271 key: TableKey<Bytes>,
272 read_options: ReadOptions,
273 on_key_value_fn: impl KeyValueFn<O>,
274 ) -> StorageResult<Option<O>> {
275 let key = UserKey::new(self.table_id, key);
276 Self::get_flushed(
277 &self.hummock_version_reader,
278 &self.read_version,
279 key,
280 read_options,
281 on_key_value_fn,
282 )
283 .await
284 }
285}
286
287impl StateStoreRead for LocalHummockFlushedSnapshotReader {
288 type Iter = HummockStorageIterator;
289 type RevIter = HummockStorageRevIterator;
290
291 fn iter(
292 &self,
293 key_range: TableKeyRange,
294 read_options: ReadOptions,
295 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
296 self.iter_flushed(key_range, read_options)
297 .instrument(tracing::trace_span!("hummock_iter"))
298 }
299
300 fn rev_iter(
301 &self,
302 key_range: TableKeyRange,
303 read_options: ReadOptions,
304 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
305 self.rev_iter_flushed(key_range, read_options)
306 .instrument(tracing::trace_span!("hummock_rev_iter"))
307 }
308}
309
310impl StateStoreGet for LocalHummockStorage {
311 async fn on_key_value<O: Send + 'static>(
312 &self,
313 key: TableKey<Bytes>,
314 read_options: ReadOptions,
315 on_key_value_fn: impl KeyValueFn<O>,
316 ) -> StorageResult<Option<O>> {
317 let key = UserKey::new(self.table_id, key);
318 match self.mem_table.buffer.get(&key.table_key) {
319 None => {
320 LocalHummockFlushedSnapshotReader::get_flushed(
321 &self.hummock_version_reader,
322 &self.read_version,
323 key,
324 read_options,
325 on_key_value_fn,
326 )
327 .await
328 }
329 Some(op) => match op {
330 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
331 Some(on_key_value_fn(
332 FullKey::new_with_gap_epoch(
333 self.table_id,
334 key.table_key.to_ref(),
335 self.current_epoch_with_gap(),
336 ),
337 value.as_ref(),
338 )?)
339 }),
340 KeyOp::Delete(_) => Ok(None),
341 },
342 }
343 }
344}
345
346impl LocalStateStore for LocalHummockStorage {
347 type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
348 type Iter<'a> = LocalHummockStorageIterator<'a>;
349 type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
350
351 async fn iter(
352 &self,
353 key_range: TableKeyRange,
354 read_options: ReadOptions,
355 ) -> StorageResult<Self::Iter<'_>> {
356 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
357 assert_eq!(
358 r_vnode_exclusive - l_vnode_inclusive,
359 1,
360 "read range {:?} for table {} iter contains more than one vnode",
361 key_range,
362 self.table_id
363 );
364 self.iter_all(key_range.clone(), self.epoch(), read_options)
365 .await
366 }
367
368 async fn rev_iter(
369 &self,
370 key_range: TableKeyRange,
371 read_options: ReadOptions,
372 ) -> StorageResult<Self::RevIter<'_>> {
373 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
374 assert_eq!(
375 r_vnode_exclusive - l_vnode_inclusive,
376 1,
377 "read range {:?} for table {} iter contains more than one vnode",
378 key_range,
379 self.table_id
380 );
381 self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
382 .await
383 }
384
385 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
386 self.new_flushed_snapshot_reader_inner()
387 }
388
389 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
390 self.read_version.read().latest_watermark(vnode)
391 }
392
393 fn insert(
394 &mut self,
395 key: TableKey<Bytes>,
396 new_val: Bytes,
397 old_val: Option<Bytes>,
398 ) -> StorageResult<()> {
399 match old_val {
400 None => self.mem_table.insert(key, new_val)?,
401 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
402 };
403
404 Ok(())
405 }
406
407 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
408 self.mem_table.delete(key, old_val)?;
409
410 Ok(())
411 }
412
413 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
414 self.update_vnode_bitmap_impl(vnodes).await
415 }
416}
417
418impl StateStoreWriteEpochControl for LocalHummockStorage {
419 async fn flush(&mut self) -> StorageResult<usize> {
420 let buffer = self.mem_table.drain().into_parts();
421 let mut kv_pairs = Vec::with_capacity(buffer.len());
422 let mut old_values = if self.is_flush_old_value() {
423 Some(Vec::with_capacity(buffer.len()))
424 } else {
425 None
426 };
427 let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
428 Some(self.new_flushed_snapshot_reader_inner())
429 } else {
430 None
431 };
432 for (key, key_op) in buffer {
433 match key_op {
434 KeyOp::Insert(value) => {
438 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
439 do_insert_sanity_check(
440 &key,
441 &value,
442 sanity_check_reader,
443 self.table_option,
444 &self.op_consistency_level,
445 )
446 .await?;
447 }
448 kv_pairs.push((key, SharedBufferValue::Insert(value)));
449 if let Some(old_values) = &mut old_values {
450 old_values.push(Bytes::new());
451 }
452 }
453 KeyOp::Delete(old_value) => {
454 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
455 do_delete_sanity_check(
456 &key,
457 &old_value,
458 sanity_check_reader,
459 self.table_option,
460 &self.op_consistency_level,
461 )
462 .await?;
463 }
464 kv_pairs.push((key, SharedBufferValue::Delete));
465 if let Some(old_values) = &mut old_values {
466 old_values.push(old_value);
467 }
468 }
469 KeyOp::Update((old_value, new_value)) => {
470 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
471 do_update_sanity_check(
472 &key,
473 &old_value,
474 &new_value,
475 sanity_check_reader,
476 self.table_option,
477 &self.op_consistency_level,
478 )
479 .await?;
480 }
481 kv_pairs.push((key, SharedBufferValue::Update(new_value)));
482 if let Some(old_values) = &mut old_values {
483 old_values.push(old_value);
484 }
485 }
486 }
487 }
488 self.flush_inner(kv_pairs, old_values).await
489 }
490
491 async fn try_flush(&mut self) -> StorageResult<()> {
492 if self.mem_table_spill_threshold != 0
493 && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
494 {
495 if self.spill_offset < MAX_SPILL_TIMES {
496 let table_id_label = self.table_id.table_id().to_string();
497 self.flush().await?;
498 self.stats
499 .mem_table_spill_counts
500 .with_label_values(&[table_id_label.as_str()])
501 .inc();
502 } else {
503 tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
504 }
505 }
506
507 Ok(())
508 }
509
510 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
511 let epoch = options.epoch;
512 wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
513 assert_eq!(
514 self.epoch.replace(epoch),
515 None,
516 "local state store of table id {:?} is init for more than once",
517 self.table_id
518 );
519 if !self.is_replicated {
520 self.event_sender
521 .send(HummockEvent::InitEpoch {
522 instance_id: self.instance_id(),
523 init_epoch: options.epoch.curr,
524 })
525 .map_err(|_| {
526 HummockError::other("failed to send InitEpoch. maybe shutting down")
527 })?;
528 }
529 Ok(())
530 }
531
532 fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
533 assert!(!self.mem_table.is_dirty());
534 if !self.is_replicated {
535 if self.upload_on_flush {
536 debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
537 } else {
538 let pending_imms = self.read_version.write().start_upload_pending_imms();
539 if !pending_imms.is_empty()
540 && self
541 .event_sender
542 .send(HummockEvent::ImmToUploader {
543 instance_id: self.instance_id(),
544 imms: pending_imms,
545 })
546 .is_err()
547 {
548 warn!("failed to send ImmToUploader during seal. maybe shutting down");
549 }
550 }
551 }
552
553 if let Some(new_level) = &opts.switch_op_consistency_level {
554 self.mem_table.op_consistency_level.update(new_level);
555 self.op_consistency_level.update(new_level);
556 }
557 let epoch = self
558 .epoch
559 .as_mut()
560 .expect("should have init epoch before seal the first epoch");
561 let prev_epoch = epoch.curr;
562 epoch.prev = prev_epoch;
563 epoch.curr = next_epoch;
564 self.spill_offset = 0;
565 assert!(
566 next_epoch > prev_epoch,
567 "new epoch {} should be greater than current epoch: {}",
568 next_epoch,
569 prev_epoch
570 );
571
572 if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
574 &mut opts.table_watermarks
575 {
576 let mut read_version = self.read_version.write();
577 read_version.filter_regress_watermarks(watermarks);
578 if !watermarks.is_empty() {
579 read_version.update(VersionUpdate::NewTableWatermark {
580 direction: *direction,
581 epoch: prev_epoch,
582 vnode_watermarks: watermarks.clone(),
583 watermark_type: WatermarkSerdeType::PkPrefix,
584 });
585 }
586 }
587
588 if !self.is_replicated
589 && self
590 .event_sender
591 .send(HummockEvent::LocalSealEpoch {
592 instance_id: self.instance_id(),
593 next_epoch,
594 opts,
595 })
596 .is_err()
597 {
598 warn!("failed to send LocalSealEpoch. maybe shutting down");
599 }
600 }
601}
602
603impl LocalHummockStorage {
604 async fn update_vnode_bitmap_impl(
605 &mut self,
606 vnodes: Arc<Bitmap>,
607 ) -> StorageResult<Arc<Bitmap>> {
608 wait_for_epoch(
609 &self.version_update_notifier_tx,
610 self.epoch.expect("should have init").prev,
611 self.table_id,
612 )
613 .await?;
614 assert!(!self.mem_table.is_dirty());
615 let mut read_version = self.read_version.write();
616 assert!(
617 read_version.staging().is_empty(),
618 "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
619 self.table_id(),
620 self.instance_id()
621 );
622 Ok(read_version.update_vnode_bitmap(vnodes))
623 }
624
625 fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
626 LocalHummockFlushedSnapshotReader {
627 table_id: self.table_id,
628 read_version: self.read_version.clone(),
629 hummock_version_reader: self.hummock_version_reader.clone(),
630 }
631 }
632
633 async fn flush_inner(
634 &mut self,
635 sorted_items: Vec<SharedBufferItem>,
636 old_values: Option<Vec<Bytes>>,
637 ) -> StorageResult<usize> {
638 let epoch = self.epoch();
639 let table_id = self.table_id;
640
641 let table_id_label = table_id.to_string();
642 self.stats
643 .write_batch_tuple_counts
644 .with_label_values(&[table_id_label.as_str()])
645 .inc_by(sorted_items.len() as _);
646 let timer = self
647 .stats
648 .write_batch_duration
649 .with_label_values(&[table_id_label.as_str()])
650 .start_timer();
651
652 let imm_size = if !sorted_items.is_empty() {
653 let (size, old_value_size) =
654 SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
655
656 self.write_limiter.wait_permission(self.table_id).await;
657 let limiter = self.memory_limiter.as_ref();
658 let tracker = match limiter.try_require_memory(size as u64) {
659 Some(tracker) => tracker,
660 _ => {
661 warn!(
662 "blocked at requiring memory: {}, current {}",
663 size,
664 limiter.get_memory_usage()
665 );
666 self.event_sender
667 .send(HummockEvent::BufferMayFlush)
668 .expect("should be able to send");
669 let tracker = limiter
670 .require_memory(size as u64)
671 .instrument_await("hummock_require_memory".verbose())
672 .await;
673 warn!(
674 "successfully requiring memory: {}, current {}",
675 size,
676 limiter.get_memory_usage()
677 );
678 tracker
679 }
680 };
681
682 let old_values = old_values.map(|old_values| {
683 SharedBufferBatchOldValues::new(
684 old_values,
685 old_value_size,
686 self.stats.old_value_size.clone(),
687 )
688 });
689
690 let instance_id = self.instance_guard.instance_id;
691 let imm = SharedBufferBatch::build_shared_buffer_batch(
692 epoch,
693 self.spill_offset,
694 sorted_items,
695 old_values,
696 size,
697 table_id,
698 Some(tracker),
699 );
700 self.spill_offset += 1;
701 let imm_size = imm.size();
702 let mut read_version = self.read_version.write();
703 read_version.add_imm(imm);
704
705 if !self.is_replicated
707 && (self.upload_on_flush
708 || read_version.pending_imm_size() >= self.mem_table_spill_threshold)
709 {
710 let imms = read_version.start_upload_pending_imms();
711 self.event_sender
712 .send(HummockEvent::ImmToUploader { instance_id, imms })
713 .map_err(|_| {
714 HummockError::other("failed to send imm to uploader. maybe shutting down")
715 })?;
716 }
717 imm_size
718 } else {
719 0
720 };
721
722 timer.observe_duration();
723
724 self.stats
725 .write_batch_size
726 .with_label_values(&[table_id_label.as_str()])
727 .observe(imm_size as _);
728 Ok(imm_size)
729 }
730}
731
732impl LocalHummockStorage {
733 #[allow(clippy::too_many_arguments)]
734 pub fn new(
735 instance_guard: LocalInstanceGuard,
736 read_version: HummockReadVersionRef,
737 hummock_version_reader: HummockVersionReader,
738 event_sender: HummockEventSender,
739 memory_limiter: Arc<MemoryLimiter>,
740 write_limiter: WriteLimiterRef,
741 option: NewLocalOptions,
742 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
743 mem_table_spill_threshold: usize,
744 ) -> Self {
745 let stats = hummock_version_reader.stats().clone();
746 Self {
747 mem_table: MemTable::new(option.op_consistency_level.clone()),
748 spill_offset: 0,
749 epoch: None,
750 table_id: option.table_id,
751 op_consistency_level: option.op_consistency_level,
752 table_option: option.table_option,
753 is_replicated: option.is_replicated,
754 instance_guard,
755 read_version,
756 event_sender,
757 memory_limiter,
758 hummock_version_reader,
759 stats,
760 write_limiter,
761 version_update_notifier_tx,
762 mem_table_spill_threshold,
763 upload_on_flush: option.upload_on_flush,
764 }
765 }
766
767 pub fn read_version(&self) -> HummockReadVersionRef {
769 self.read_version.clone()
770 }
771
772 pub fn table_id(&self) -> TableId {
773 self.instance_guard.table_id
774 }
775
776 pub fn instance_id(&self) -> u64 {
777 self.instance_guard.instance_id
778 }
779
780 fn is_flush_old_value(&self) -> bool {
781 matches!(
782 &self.op_consistency_level,
783 OpConsistencyLevel::ConsistentOldValue {
784 is_log_store: true,
785 ..
786 }
787 )
788 }
789}
790
791pub type StagingDataIterator = MergeIterator<
792 HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
793>;
794pub type StagingDataRevIterator = MergeIterator<
795 HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
796>;
797pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
798 HummockIteratorUnion<
799 Forward,
800 StagingDataIterator,
801 SstableIterator,
802 ConcatIteratorInner<SstableIterator>,
803 MemTableHummockIterator<'a>,
804 >,
805>;
806
807pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
808 HummockIteratorUnion<
809 Backward,
810 StagingDataRevIterator,
811 BackwardSstableIterator,
812 ConcatIteratorInner<BackwardSstableIterator>,
813 MemTableHummockRevIterator<'a>,
814 >,
815>;
816
817pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
818pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
819pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
820pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
821
822pub struct HummockStorageIteratorInner<'a> {
823 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
824 initial_read: bool,
825 stats_guard: IterLocalMetricsGuard,
826}
827
828impl StateStoreIter for HummockStorageIteratorInner<'_> {
829 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
830 let iter = &mut self.inner;
831 if !self.initial_read {
832 self.initial_read = true;
833 } else {
834 iter.next().await?;
835 }
836
837 if iter.is_valid() {
838 Ok(Some((iter.key(), iter.value())))
839 } else {
840 Ok(None)
841 }
842 }
843}
844
845impl<'a> HummockStorageIteratorInner<'a> {
846 pub fn new(
847 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
848 metrics: Arc<HummockStateStoreMetrics>,
849 table_id: TableId,
850 mut local_stats: StoreLocalStatistic,
851 ) -> Self {
852 local_stats.found_key = inner.is_valid();
853 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
854 + local_stats.staging_sst_iter_count
855 + local_stats.overlapping_iter_count
856 + local_stats.non_overlapping_iter_count;
857 Self {
858 inner,
859 initial_read: false,
860 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
861 }
862 }
863}
864
865impl Drop for HummockStorageIteratorInner<'_> {
866 fn drop(&mut self) {
867 self.inner
868 .collect_local_statistic(&mut self.stats_guard.local_stats);
869 }
870}
871
872#[derive(Default)]
873pub struct ForwardIteratorFactory {
874 non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
875 overlapping_iters: Vec<SstableIterator>,
876 staging_iters:
877 Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
878}
879
880impl ForwardIteratorFactory {
881 pub fn build(
882 self,
883 mem_table: Option<MemTableHummockIterator<'_>>,
884 ) -> HummockStorageIteratorPayloadInner<'_> {
885 let staging_iter = StagingDataIterator::new(self.staging_iters);
887 MergeIterator::new(
888 once(HummockIteratorUnion::First(staging_iter))
889 .chain(
890 self.overlapping_iters
891 .into_iter()
892 .map(HummockIteratorUnion::Second),
893 )
894 .chain(
895 self.non_overlapping_iters
896 .into_iter()
897 .map(HummockIteratorUnion::Third),
898 )
899 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
900 )
901 }
902}
903
904pub struct HummockStorageRevIteratorInner<'a> {
905 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
906 initial_read: bool,
907 stats_guard: IterLocalMetricsGuard,
908}
909
910impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
911 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
912 let iter = &mut self.inner;
913 if !self.initial_read {
914 self.initial_read = true;
915 } else {
916 iter.next().await?;
917 }
918
919 if iter.is_valid() {
920 Ok(Some((iter.key(), iter.value())))
921 } else {
922 Ok(None)
923 }
924 }
925}
926
927impl<'a> HummockStorageRevIteratorInner<'a> {
928 pub fn new(
929 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
930 metrics: Arc<HummockStateStoreMetrics>,
931 table_id: TableId,
932 mut local_stats: StoreLocalStatistic,
933 ) -> Self {
934 local_stats.found_key = inner.is_valid();
935 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
936 + local_stats.staging_sst_iter_count
937 + local_stats.overlapping_iter_count
938 + local_stats.non_overlapping_iter_count;
939 Self {
940 inner,
941 initial_read: false,
942 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
943 }
944 }
945}
946
947impl Drop for HummockStorageRevIteratorInner<'_> {
948 fn drop(&mut self) {
949 self.inner
950 .collect_local_statistic(&mut self.stats_guard.local_stats);
951 }
952}
953
954impl IteratorFactory for ForwardIteratorFactory {
955 type Direction = Forward;
956 type SstableIteratorType = SstableIterator;
957
958 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
959 self.staging_iters
960 .push(HummockIteratorUnion::First(batch.into_forward_iter()));
961 }
962
963 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
964 self.staging_iters.push(HummockIteratorUnion::Second(iter));
965 }
966
967 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
968 self.overlapping_iters.push(iter);
969 }
970
971 fn add_concat_sst_iter(
972 &mut self,
973 tables: Vec<SstableInfo>,
974 sstable_store: SstableStoreRef,
975 read_options: Arc<SstableIteratorReadOptions>,
976 ) {
977 self.non_overlapping_iters
978 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
979 tables,
980 sstable_store,
981 read_options,
982 ));
983 }
984}
985
986#[derive(Default)]
987pub struct BackwardIteratorFactory {
988 non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
989 overlapping_iters: Vec<BackwardSstableIterator>,
990 staging_iters: Vec<
991 HummockIteratorUnion<
992 Backward,
993 SharedBufferBatchIterator<Backward>,
994 BackwardSstableIterator,
995 >,
996 >,
997}
998
999impl BackwardIteratorFactory {
1000 pub fn build(
1001 self,
1002 mem_table: Option<MemTableHummockRevIterator<'_>>,
1003 ) -> StorageRevIteratorPayloadInner<'_> {
1004 let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1006 MergeIterator::new(
1007 once(HummockIteratorUnion::First(staging_iter))
1008 .chain(
1009 self.overlapping_iters
1010 .into_iter()
1011 .map(HummockIteratorUnion::Second),
1012 )
1013 .chain(
1014 self.non_overlapping_iters
1015 .into_iter()
1016 .map(HummockIteratorUnion::Third),
1017 )
1018 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1019 )
1020 }
1021}
1022
1023impl IteratorFactory for BackwardIteratorFactory {
1024 type Direction = Backward;
1025 type SstableIteratorType = BackwardSstableIterator;
1026
1027 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1028 self.staging_iters
1029 .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1030 }
1031
1032 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1033 self.staging_iters.push(HummockIteratorUnion::Second(iter));
1034 }
1035
1036 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1037 self.overlapping_iters.push(iter);
1038 }
1039
1040 fn add_concat_sst_iter(
1041 &mut self,
1042 mut tables: Vec<SstableInfo>,
1043 sstable_store: SstableStoreRef,
1044 read_options: Arc<SstableIteratorReadOptions>,
1045 ) {
1046 tables.reverse();
1047 self.non_overlapping_iters
1048 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1049 tables,
1050 sstable_store,
1051 read_options,
1052 ));
1053 }
1054}