1use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28 FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::VersionUpdate;
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39 Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40 IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44 SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45 SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50 wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54 BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55 SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61pub struct LocalHummockStorage {
64 mem_table: MemTable,
65
66 spill_offset: u16,
67 epoch: Option<EpochPair>,
68
69 table_id: TableId,
70 op_consistency_level: OpConsistencyLevel,
71 table_option: TableOption,
72
73 instance_guard: LocalInstanceGuard,
74
75 read_version: HummockReadVersionRef,
77
78 is_replicated: bool,
89
90 upload_on_flush: bool,
92
93 event_sender: HummockEventSender,
95
96 memory_limiter: Arc<MemoryLimiter>,
97
98 hummock_version_reader: HummockVersionReader,
99
100 stats: Arc<HummockStateStoreMetrics>,
101
102 write_limiter: WriteLimiterRef,
103
104 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106 mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110 async fn get_flushed<O>(
111 hummock_version_reader: &HummockVersionReader,
112 read_version: &HummockReadVersionRef,
113 user_key: UserKey<Bytes>,
114 read_options: ReadOptions,
115 on_key_value_fn: impl crate::store::KeyValueFn<O>,
116 ) -> StorageResult<Option<O>> {
117 let table_key_range = (
118 Bound::Included(user_key.table_key.clone()),
119 Bound::Included(user_key.table_key.clone()),
120 );
121
122 let (table_key_range, read_snapshot) =
123 read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
124
125 if is_empty_key_range(&table_key_range) {
126 return Ok(None);
127 }
128
129 hummock_version_reader
130 .get(
131 user_key.table_key,
132 MAX_EPOCH,
133 user_key.table_id,
134 read_options,
135 read_snapshot,
136 on_key_value_fn,
137 )
138 .await
139 }
140
141 async fn iter_flushed(
142 &self,
143 table_key_range: TableKeyRange,
144 read_options: ReadOptions,
145 ) -> StorageResult<HummockStorageIterator> {
146 let (table_key_range, read_snapshot) = read_filter_for_version(
147 MAX_EPOCH,
148 self.table_id,
149 table_key_range,
150 &self.read_version,
151 )?;
152
153 let table_key_range = table_key_range;
154
155 self.hummock_version_reader
156 .iter(
157 table_key_range,
158 MAX_EPOCH,
159 self.table_id,
160 read_options,
161 read_snapshot,
162 )
163 .await
164 }
165
166 async fn rev_iter_flushed(
167 &self,
168 table_key_range: TableKeyRange,
169 read_options: ReadOptions,
170 ) -> StorageResult<HummockStorageRevIterator> {
171 let (table_key_range, read_snapshot) = read_filter_for_version(
172 MAX_EPOCH,
173 self.table_id,
174 table_key_range,
175 &self.read_version,
176 )?;
177
178 let table_key_range = table_key_range;
179
180 self.hummock_version_reader
181 .rev_iter(
182 table_key_range,
183 MAX_EPOCH,
184 self.table_id,
185 read_options,
186 read_snapshot,
187 None,
188 )
189 .await
190 }
191}
192
193impl LocalHummockStorage {
194 fn epoch(&self) -> u64 {
195 self.epoch.expect("should have set the epoch").curr
196 }
197
198 fn current_epoch_with_gap(&self) -> EpochWithGap {
199 EpochWithGap::new(self.epoch(), self.spill_offset)
200 }
201
202 fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
203 MemTableHummockIterator::new(
204 &self.mem_table.buffer,
205 self.current_epoch_with_gap(),
206 self.table_id,
207 )
208 }
209
210 fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
211 MemTableHummockRevIterator::new(
212 &self.mem_table.buffer,
213 self.current_epoch_with_gap(),
214 self.table_id,
215 )
216 }
217
218 async fn iter_all(
219 &self,
220 table_key_range: TableKeyRange,
221 epoch: u64,
222 read_options: ReadOptions,
223 ) -> StorageResult<LocalHummockStorageIterator<'_>> {
224 let (table_key_range, read_snapshot) =
225 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
226
227 self.hummock_version_reader
228 .iter_with_memtable(
229 table_key_range,
230 epoch,
231 self.table_id,
232 read_options,
233 read_snapshot,
234 Some(self.mem_table_iter()),
235 )
236 .await
237 }
238
239 async fn rev_iter_all(
240 &self,
241 table_key_range: TableKeyRange,
242 epoch: u64,
243 read_options: ReadOptions,
244 ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245 let (table_key_range, read_snapshot) =
246 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248 self.hummock_version_reader
249 .rev_iter(
250 table_key_range,
251 epoch,
252 self.table_id,
253 read_options,
254 read_snapshot,
255 Some(self.mem_table_rev_iter()),
256 )
257 .await
258 }
259}
260
261#[derive(Clone)]
262pub struct LocalHummockFlushedSnapshotReader {
263 table_id: TableId,
264 read_version: HummockReadVersionRef,
265 hummock_version_reader: HummockVersionReader,
266}
267
268impl StateStoreGet for LocalHummockFlushedSnapshotReader {
269 async fn on_key_value<O: Send + 'static>(
270 &self,
271 key: TableKey<Bytes>,
272 read_options: ReadOptions,
273 on_key_value_fn: impl KeyValueFn<O>,
274 ) -> StorageResult<Option<O>> {
275 let key = UserKey::new(self.table_id, key);
276 Self::get_flushed(
277 &self.hummock_version_reader,
278 &self.read_version,
279 key,
280 read_options,
281 on_key_value_fn,
282 )
283 .await
284 }
285}
286
287impl StateStoreRead for LocalHummockFlushedSnapshotReader {
288 type Iter = HummockStorageIterator;
289 type RevIter = HummockStorageRevIterator;
290
291 fn iter(
292 &self,
293 key_range: TableKeyRange,
294 read_options: ReadOptions,
295 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
296 self.iter_flushed(key_range, read_options)
297 .instrument(tracing::trace_span!("hummock_iter"))
298 }
299
300 fn rev_iter(
301 &self,
302 key_range: TableKeyRange,
303 read_options: ReadOptions,
304 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
305 self.rev_iter_flushed(key_range, read_options)
306 .instrument(tracing::trace_span!("hummock_rev_iter"))
307 }
308}
309
310impl StateStoreGet for LocalHummockStorage {
311 async fn on_key_value<O: Send + 'static>(
312 &self,
313 key: TableKey<Bytes>,
314 read_options: ReadOptions,
315 on_key_value_fn: impl KeyValueFn<O>,
316 ) -> StorageResult<Option<O>> {
317 let key = UserKey::new(self.table_id, key);
318 match self.mem_table.buffer.get(&key.table_key) {
319 None => {
320 LocalHummockFlushedSnapshotReader::get_flushed(
321 &self.hummock_version_reader,
322 &self.read_version,
323 key,
324 read_options,
325 on_key_value_fn,
326 )
327 .await
328 }
329 Some(op) => match op {
330 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
331 Some(on_key_value_fn(
332 FullKey::new_with_gap_epoch(
333 self.table_id,
334 key.table_key.to_ref(),
335 self.current_epoch_with_gap(),
336 ),
337 value.as_ref(),
338 )?)
339 }),
340 KeyOp::Delete(_) => Ok(None),
341 },
342 }
343 }
344}
345
346impl LocalStateStore for LocalHummockStorage {
347 type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
348 type Iter<'a> = LocalHummockStorageIterator<'a>;
349 type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
350
351 async fn iter(
352 &self,
353 key_range: TableKeyRange,
354 read_options: ReadOptions,
355 ) -> StorageResult<Self::Iter<'_>> {
356 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
357 assert_eq!(
358 r_vnode_exclusive - l_vnode_inclusive,
359 1,
360 "read range {:?} for table {} iter contains more than one vnode",
361 key_range,
362 self.table_id
363 );
364 self.iter_all(key_range.clone(), self.epoch(), read_options)
365 .await
366 }
367
368 async fn rev_iter(
369 &self,
370 key_range: TableKeyRange,
371 read_options: ReadOptions,
372 ) -> StorageResult<Self::RevIter<'_>> {
373 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
374 assert_eq!(
375 r_vnode_exclusive - l_vnode_inclusive,
376 1,
377 "read range {:?} for table {} iter contains more than one vnode",
378 key_range,
379 self.table_id
380 );
381 self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
382 .await
383 }
384
385 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
386 self.new_flushed_snapshot_reader_inner()
387 }
388
389 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
390 self.read_version.read().latest_watermark(vnode)
391 }
392
393 fn insert(
394 &mut self,
395 key: TableKey<Bytes>,
396 new_val: Bytes,
397 old_val: Option<Bytes>,
398 ) -> StorageResult<()> {
399 match old_val {
400 None => self.mem_table.insert(key, new_val)?,
401 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
402 };
403
404 Ok(())
405 }
406
407 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
408 self.mem_table.delete(key, old_val)?;
409
410 Ok(())
411 }
412
413 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
414 self.update_vnode_bitmap_impl(vnodes).await
415 }
416}
417
418impl StateStoreWriteEpochControl for LocalHummockStorage {
419 async fn flush(&mut self) -> StorageResult<usize> {
420 let buffer = self.mem_table.drain().into_parts();
421 let mut kv_pairs = Vec::with_capacity(buffer.len());
422 let mut old_values = if self.is_flush_old_value() {
423 Some(Vec::with_capacity(buffer.len()))
424 } else {
425 None
426 };
427 let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
428 Some(self.new_flushed_snapshot_reader_inner())
429 } else {
430 None
431 };
432 for (key, key_op) in buffer {
433 match key_op {
434 KeyOp::Insert(value) => {
438 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
439 do_insert_sanity_check(
440 self.table_id,
441 &key,
442 &value,
443 sanity_check_reader,
444 self.table_option,
445 &self.op_consistency_level,
446 )
447 .await?;
448 }
449 kv_pairs.push((key, SharedBufferValue::Insert(value)));
450 if let Some(old_values) = &mut old_values {
451 old_values.push(Bytes::new());
452 }
453 }
454 KeyOp::Delete(old_value) => {
455 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
456 do_delete_sanity_check(
457 self.table_id,
458 &key,
459 &old_value,
460 sanity_check_reader,
461 self.table_option,
462 &self.op_consistency_level,
463 )
464 .await?;
465 }
466 kv_pairs.push((key, SharedBufferValue::Delete));
467 if let Some(old_values) = &mut old_values {
468 old_values.push(old_value);
469 }
470 }
471 KeyOp::Update((old_value, new_value)) => {
472 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
473 do_update_sanity_check(
474 self.table_id,
475 &key,
476 &old_value,
477 &new_value,
478 sanity_check_reader,
479 self.table_option,
480 &self.op_consistency_level,
481 )
482 .await?;
483 }
484 kv_pairs.push((key, SharedBufferValue::Update(new_value)));
485 if let Some(old_values) = &mut old_values {
486 old_values.push(old_value);
487 }
488 }
489 }
490 }
491 self.flush_inner(kv_pairs, old_values).await
492 }
493
494 async fn try_flush(&mut self) -> StorageResult<()> {
495 if self.mem_table_spill_threshold != 0
496 && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
497 {
498 if self.spill_offset < MAX_SPILL_TIMES {
499 let table_id_label = self.table_id.table_id().to_string();
500 self.flush().await?;
501 self.stats
502 .mem_table_spill_counts
503 .with_label_values(&[table_id_label.as_str()])
504 .inc();
505 } else {
506 tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
507 }
508 }
509
510 Ok(())
511 }
512
513 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
514 let epoch = options.epoch;
515 wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
516 assert_eq!(
517 self.epoch.replace(epoch),
518 None,
519 "local state store of table id {:?} is init for more than once",
520 self.table_id
521 );
522 if !self.is_replicated {
523 self.event_sender
524 .send(HummockEvent::InitEpoch {
525 instance_id: self.instance_id(),
526 init_epoch: options.epoch.curr,
527 })
528 .map_err(|_| {
529 HummockError::other("failed to send InitEpoch. maybe shutting down")
530 })?;
531 }
532 Ok(())
533 }
534
535 fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
536 assert!(!self.mem_table.is_dirty());
537 if !self.is_replicated {
538 if self.upload_on_flush {
539 debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
540 } else {
541 let pending_imms = self.read_version.write().start_upload_pending_imms();
542 if !pending_imms.is_empty()
543 && self
544 .event_sender
545 .send(HummockEvent::ImmToUploader {
546 instance_id: self.instance_id(),
547 imms: pending_imms,
548 })
549 .is_err()
550 {
551 warn!("failed to send ImmToUploader during seal. maybe shutting down");
552 }
553 }
554 }
555
556 if let Some(new_level) = &opts.switch_op_consistency_level {
557 self.mem_table.op_consistency_level.update(new_level);
558 self.op_consistency_level.update(new_level);
559 }
560 let epoch = self
561 .epoch
562 .as_mut()
563 .expect("should have init epoch before seal the first epoch");
564 let prev_epoch = epoch.curr;
565 epoch.prev = prev_epoch;
566 epoch.curr = next_epoch;
567 self.spill_offset = 0;
568 assert!(
569 next_epoch > prev_epoch,
570 "new epoch {} should be greater than current epoch: {}",
571 next_epoch,
572 prev_epoch
573 );
574
575 if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
577 &mut opts.table_watermarks
578 {
579 let mut read_version = self.read_version.write();
580 read_version.filter_regress_watermarks(watermarks);
581 if !watermarks.is_empty() {
582 read_version.update(VersionUpdate::NewTableWatermark {
583 direction: *direction,
584 epoch: prev_epoch,
585 vnode_watermarks: watermarks.clone(),
586 watermark_type: WatermarkSerdeType::PkPrefix,
587 });
588 }
589 }
590
591 if !self.is_replicated
592 && self
593 .event_sender
594 .send(HummockEvent::LocalSealEpoch {
595 instance_id: self.instance_id(),
596 next_epoch,
597 opts,
598 })
599 .is_err()
600 {
601 warn!("failed to send LocalSealEpoch. maybe shutting down");
602 }
603 }
604}
605
606impl LocalHummockStorage {
607 async fn update_vnode_bitmap_impl(
608 &mut self,
609 vnodes: Arc<Bitmap>,
610 ) -> StorageResult<Arc<Bitmap>> {
611 wait_for_epoch(
612 &self.version_update_notifier_tx,
613 self.epoch.expect("should have init").prev,
614 self.table_id,
615 )
616 .await?;
617 assert!(!self.mem_table.is_dirty());
618 let mut read_version = self.read_version.write();
619 assert!(
620 read_version.staging().is_empty(),
621 "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
622 self.table_id(),
623 self.instance_id()
624 );
625 Ok(read_version.update_vnode_bitmap(vnodes))
626 }
627
628 fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
629 LocalHummockFlushedSnapshotReader {
630 table_id: self.table_id,
631 read_version: self.read_version.clone(),
632 hummock_version_reader: self.hummock_version_reader.clone(),
633 }
634 }
635
636 async fn flush_inner(
637 &mut self,
638 sorted_items: Vec<SharedBufferItem>,
639 old_values: Option<Vec<Bytes>>,
640 ) -> StorageResult<usize> {
641 let epoch = self.epoch();
642 let table_id = self.table_id;
643
644 let table_id_label = table_id.to_string();
645 self.stats
646 .write_batch_tuple_counts
647 .with_label_values(&[table_id_label.as_str()])
648 .inc_by(sorted_items.len() as _);
649 let timer = self
650 .stats
651 .write_batch_duration
652 .with_label_values(&[table_id_label.as_str()])
653 .start_timer();
654
655 let imm_size = if !sorted_items.is_empty() {
656 let (size, old_value_size) =
657 SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
658
659 self.write_limiter.wait_permission(self.table_id).await;
660 let limiter = self.memory_limiter.as_ref();
661 let tracker = match limiter.try_require_memory(size as u64) {
662 Some(tracker) => tracker,
663 _ => {
664 warn!(
665 "blocked at requiring memory: {}, current {}",
666 size,
667 limiter.get_memory_usage()
668 );
669 self.event_sender
670 .send(HummockEvent::BufferMayFlush)
671 .expect("should be able to send");
672 let tracker = limiter
673 .require_memory(size as u64)
674 .instrument_await("hummock_require_memory".verbose())
675 .await;
676 warn!(
677 "successfully requiring memory: {}, current {}",
678 size,
679 limiter.get_memory_usage()
680 );
681 tracker
682 }
683 };
684
685 let old_values = old_values.map(|old_values| {
686 SharedBufferBatchOldValues::new(
687 old_values,
688 old_value_size,
689 self.stats.old_value_size.clone(),
690 )
691 });
692
693 let instance_id = self.instance_guard.instance_id;
694 let imm = SharedBufferBatch::build_shared_buffer_batch(
695 epoch,
696 self.spill_offset,
697 sorted_items,
698 old_values,
699 size,
700 table_id,
701 Some(tracker),
702 );
703 self.spill_offset += 1;
704 let imm_size = imm.size();
705 let mut read_version = self.read_version.write();
706 read_version.add_imm(imm);
707
708 if !self.is_replicated
710 && (self.upload_on_flush
711 || read_version.pending_imm_size() >= self.mem_table_spill_threshold)
712 {
713 let imms = read_version.start_upload_pending_imms();
714 self.event_sender
715 .send(HummockEvent::ImmToUploader { instance_id, imms })
716 .map_err(|_| {
717 HummockError::other("failed to send imm to uploader. maybe shutting down")
718 })?;
719 }
720 imm_size
721 } else {
722 0
723 };
724
725 timer.observe_duration();
726
727 self.stats
728 .write_batch_size
729 .with_label_values(&[table_id_label.as_str()])
730 .observe(imm_size as _);
731 Ok(imm_size)
732 }
733}
734
735impl LocalHummockStorage {
736 #[allow(clippy::too_many_arguments)]
737 pub fn new(
738 instance_guard: LocalInstanceGuard,
739 read_version: HummockReadVersionRef,
740 hummock_version_reader: HummockVersionReader,
741 event_sender: HummockEventSender,
742 memory_limiter: Arc<MemoryLimiter>,
743 write_limiter: WriteLimiterRef,
744 option: NewLocalOptions,
745 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
746 mem_table_spill_threshold: usize,
747 ) -> Self {
748 let stats = hummock_version_reader.stats().clone();
749 Self {
750 mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
751 spill_offset: 0,
752 epoch: None,
753 table_id: option.table_id,
754 op_consistency_level: option.op_consistency_level,
755 table_option: option.table_option,
756 is_replicated: option.is_replicated,
757 instance_guard,
758 read_version,
759 event_sender,
760 memory_limiter,
761 hummock_version_reader,
762 stats,
763 write_limiter,
764 version_update_notifier_tx,
765 mem_table_spill_threshold,
766 upload_on_flush: option.upload_on_flush,
767 }
768 }
769
770 pub fn read_version(&self) -> HummockReadVersionRef {
772 self.read_version.clone()
773 }
774
775 pub fn table_id(&self) -> TableId {
776 self.instance_guard.table_id
777 }
778
779 pub fn instance_id(&self) -> u64 {
780 self.instance_guard.instance_id
781 }
782
783 fn is_flush_old_value(&self) -> bool {
784 matches!(
785 &self.op_consistency_level,
786 OpConsistencyLevel::ConsistentOldValue {
787 is_log_store: true,
788 ..
789 }
790 )
791 }
792}
793
794pub type StagingDataIterator = MergeIterator<
795 HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
796>;
797pub type StagingDataRevIterator = MergeIterator<
798 HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
799>;
800pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
801 HummockIteratorUnion<
802 Forward,
803 StagingDataIterator,
804 SstableIterator,
805 ConcatIteratorInner<SstableIterator>,
806 MemTableHummockIterator<'a>,
807 >,
808>;
809
810pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
811 HummockIteratorUnion<
812 Backward,
813 StagingDataRevIterator,
814 BackwardSstableIterator,
815 ConcatIteratorInner<BackwardSstableIterator>,
816 MemTableHummockRevIterator<'a>,
817 >,
818>;
819
820pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
821pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
822pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
823pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
824
825pub struct HummockStorageIteratorInner<'a> {
826 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
827 initial_read: bool,
828 stats_guard: IterLocalMetricsGuard,
829}
830
831impl StateStoreIter for HummockStorageIteratorInner<'_> {
832 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
833 let iter = &mut self.inner;
834 if !self.initial_read {
835 self.initial_read = true;
836 } else {
837 iter.next().await?;
838 }
839
840 if iter.is_valid() {
841 Ok(Some((iter.key(), iter.value())))
842 } else {
843 Ok(None)
844 }
845 }
846}
847
848impl<'a> HummockStorageIteratorInner<'a> {
849 pub fn new(
850 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
851 metrics: Arc<HummockStateStoreMetrics>,
852 table_id: TableId,
853 mut local_stats: StoreLocalStatistic,
854 ) -> Self {
855 local_stats.found_key = inner.is_valid();
856 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
857 + local_stats.staging_sst_iter_count
858 + local_stats.overlapping_iter_count
859 + local_stats.non_overlapping_iter_count;
860 Self {
861 inner,
862 initial_read: false,
863 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
864 }
865 }
866}
867
868impl Drop for HummockStorageIteratorInner<'_> {
869 fn drop(&mut self) {
870 self.inner
871 .collect_local_statistic(&mut self.stats_guard.local_stats);
872 }
873}
874
875#[derive(Default)]
876pub struct ForwardIteratorFactory {
877 non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
878 overlapping_iters: Vec<SstableIterator>,
879 staging_iters:
880 Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
881}
882
883impl ForwardIteratorFactory {
884 pub fn build(
885 self,
886 mem_table: Option<MemTableHummockIterator<'_>>,
887 ) -> HummockStorageIteratorPayloadInner<'_> {
888 let staging_iter = StagingDataIterator::new(self.staging_iters);
890 MergeIterator::new(
891 once(HummockIteratorUnion::First(staging_iter))
892 .chain(
893 self.overlapping_iters
894 .into_iter()
895 .map(HummockIteratorUnion::Second),
896 )
897 .chain(
898 self.non_overlapping_iters
899 .into_iter()
900 .map(HummockIteratorUnion::Third),
901 )
902 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
903 )
904 }
905}
906
907pub struct HummockStorageRevIteratorInner<'a> {
908 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
909 initial_read: bool,
910 stats_guard: IterLocalMetricsGuard,
911}
912
913impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
914 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
915 let iter = &mut self.inner;
916 if !self.initial_read {
917 self.initial_read = true;
918 } else {
919 iter.next().await?;
920 }
921
922 if iter.is_valid() {
923 Ok(Some((iter.key(), iter.value())))
924 } else {
925 Ok(None)
926 }
927 }
928}
929
930impl<'a> HummockStorageRevIteratorInner<'a> {
931 pub fn new(
932 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
933 metrics: Arc<HummockStateStoreMetrics>,
934 table_id: TableId,
935 mut local_stats: StoreLocalStatistic,
936 ) -> Self {
937 local_stats.found_key = inner.is_valid();
938 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
939 + local_stats.staging_sst_iter_count
940 + local_stats.overlapping_iter_count
941 + local_stats.non_overlapping_iter_count;
942 Self {
943 inner,
944 initial_read: false,
945 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
946 }
947 }
948}
949
950impl Drop for HummockStorageRevIteratorInner<'_> {
951 fn drop(&mut self) {
952 self.inner
953 .collect_local_statistic(&mut self.stats_guard.local_stats);
954 }
955}
956
957impl IteratorFactory for ForwardIteratorFactory {
958 type Direction = Forward;
959 type SstableIteratorType = SstableIterator;
960
961 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
962 self.staging_iters
963 .push(HummockIteratorUnion::First(batch.into_forward_iter()));
964 }
965
966 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
967 self.staging_iters.push(HummockIteratorUnion::Second(iter));
968 }
969
970 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
971 self.overlapping_iters.push(iter);
972 }
973
974 fn add_concat_sst_iter(
975 &mut self,
976 tables: Vec<SstableInfo>,
977 sstable_store: SstableStoreRef,
978 read_options: Arc<SstableIteratorReadOptions>,
979 ) {
980 self.non_overlapping_iters
981 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
982 tables,
983 sstable_store,
984 read_options,
985 ));
986 }
987}
988
989#[derive(Default)]
990pub struct BackwardIteratorFactory {
991 non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
992 overlapping_iters: Vec<BackwardSstableIterator>,
993 staging_iters: Vec<
994 HummockIteratorUnion<
995 Backward,
996 SharedBufferBatchIterator<Backward>,
997 BackwardSstableIterator,
998 >,
999 >,
1000}
1001
1002impl BackwardIteratorFactory {
1003 pub fn build(
1004 self,
1005 mem_table: Option<MemTableHummockRevIterator<'_>>,
1006 ) -> StorageRevIteratorPayloadInner<'_> {
1007 let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1009 MergeIterator::new(
1010 once(HummockIteratorUnion::First(staging_iter))
1011 .chain(
1012 self.overlapping_iters
1013 .into_iter()
1014 .map(HummockIteratorUnion::Second),
1015 )
1016 .chain(
1017 self.non_overlapping_iters
1018 .into_iter()
1019 .map(HummockIteratorUnion::Third),
1020 )
1021 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1022 )
1023 }
1024}
1025
1026impl IteratorFactory for BackwardIteratorFactory {
1027 type Direction = Backward;
1028 type SstableIteratorType = BackwardSstableIterator;
1029
1030 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1031 self.staging_iters
1032 .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1033 }
1034
1035 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1036 self.staging_iters.push(HummockIteratorUnion::Second(iter));
1037 }
1038
1039 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1040 self.overlapping_iters.push(iter);
1041 }
1042
1043 fn add_concat_sst_iter(
1044 &mut self,
1045 mut tables: Vec<SstableInfo>,
1046 sstable_store: SstableStoreRef,
1047 read_options: Arc<SstableIteratorReadOptions>,
1048 ) {
1049 tables.reverse();
1050 self.non_overlapping_iters
1051 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1052 tables,
1053 sstable_store,
1054 read_options,
1055 ));
1056 }
1057}