1use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28 FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::{StagingData, VersionUpdate};
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39 Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40 IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44 SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45 SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50 wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54 BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55 SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61pub struct LocalHummockStorage {
64 mem_table: MemTable,
65
66 spill_offset: u16,
67 epoch: Option<EpochPair>,
68
69 table_id: TableId,
70 op_consistency_level: OpConsistencyLevel,
71 table_option: TableOption,
72
73 instance_guard: LocalInstanceGuard,
74
75 read_version: HummockReadVersionRef,
77
78 is_replicated: bool,
89
90 event_sender: HummockEventSender,
92
93 memory_limiter: Arc<MemoryLimiter>,
94
95 hummock_version_reader: HummockVersionReader,
96
97 stats: Arc<HummockStateStoreMetrics>,
98
99 write_limiter: WriteLimiterRef,
100
101 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
102
103 mem_table_spill_threshold: usize,
104}
105
106impl LocalHummockFlushedSnapshotReader {
107 async fn get_flushed<O>(
108 hummock_version_reader: &HummockVersionReader,
109 read_version: &HummockReadVersionRef,
110 user_key: UserKey<Bytes>,
111 read_options: ReadOptions,
112 on_key_value_fn: impl crate::store::KeyValueFn<O>,
113 ) -> StorageResult<Option<O>> {
114 let table_key_range = (
115 Bound::Included(user_key.table_key.clone()),
116 Bound::Included(user_key.table_key.clone()),
117 );
118
119 let (table_key_range, read_snapshot) =
120 read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
121
122 if is_empty_key_range(&table_key_range) {
123 return Ok(None);
124 }
125
126 hummock_version_reader
127 .get(
128 user_key.table_key,
129 MAX_EPOCH,
130 user_key.table_id,
131 read_options,
132 read_snapshot,
133 on_key_value_fn,
134 )
135 .await
136 }
137
138 async fn iter_flushed(
139 &self,
140 table_key_range: TableKeyRange,
141 read_options: ReadOptions,
142 ) -> StorageResult<HummockStorageIterator> {
143 let (table_key_range, read_snapshot) = read_filter_for_version(
144 MAX_EPOCH,
145 self.table_id,
146 table_key_range,
147 &self.read_version,
148 )?;
149
150 let table_key_range = table_key_range;
151
152 self.hummock_version_reader
153 .iter(
154 table_key_range,
155 MAX_EPOCH,
156 self.table_id,
157 read_options,
158 read_snapshot,
159 )
160 .await
161 }
162
163 async fn rev_iter_flushed(
164 &self,
165 table_key_range: TableKeyRange,
166 read_options: ReadOptions,
167 ) -> StorageResult<HummockStorageRevIterator> {
168 let (table_key_range, read_snapshot) = read_filter_for_version(
169 MAX_EPOCH,
170 self.table_id,
171 table_key_range,
172 &self.read_version,
173 )?;
174
175 let table_key_range = table_key_range;
176
177 self.hummock_version_reader
178 .rev_iter(
179 table_key_range,
180 MAX_EPOCH,
181 self.table_id,
182 read_options,
183 read_snapshot,
184 None,
185 )
186 .await
187 }
188}
189
190impl LocalHummockStorage {
191 fn epoch(&self) -> u64 {
192 self.epoch.expect("should have set the epoch").curr
193 }
194
195 fn current_epoch_with_gap(&self) -> EpochWithGap {
196 EpochWithGap::new(self.epoch(), self.spill_offset)
197 }
198
199 fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
200 MemTableHummockIterator::new(
201 &self.mem_table.buffer,
202 self.current_epoch_with_gap(),
203 self.table_id,
204 )
205 }
206
207 fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
208 MemTableHummockRevIterator::new(
209 &self.mem_table.buffer,
210 self.current_epoch_with_gap(),
211 self.table_id,
212 )
213 }
214
215 async fn iter_all(
216 &self,
217 table_key_range: TableKeyRange,
218 epoch: u64,
219 read_options: ReadOptions,
220 ) -> StorageResult<LocalHummockStorageIterator<'_>> {
221 let (table_key_range, read_snapshot) =
222 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
223
224 self.hummock_version_reader
225 .iter_with_memtable(
226 table_key_range,
227 epoch,
228 self.table_id,
229 read_options,
230 read_snapshot,
231 Some(self.mem_table_iter()),
232 )
233 .await
234 }
235
236 async fn rev_iter_all(
237 &self,
238 table_key_range: TableKeyRange,
239 epoch: u64,
240 read_options: ReadOptions,
241 ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
242 let (table_key_range, read_snapshot) =
243 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
244
245 self.hummock_version_reader
246 .rev_iter(
247 table_key_range,
248 epoch,
249 self.table_id,
250 read_options,
251 read_snapshot,
252 Some(self.mem_table_rev_iter()),
253 )
254 .await
255 }
256}
257
258#[derive(Clone)]
259pub struct LocalHummockFlushedSnapshotReader {
260 table_id: TableId,
261 read_version: HummockReadVersionRef,
262 hummock_version_reader: HummockVersionReader,
263}
264
265impl StateStoreGet for LocalHummockFlushedSnapshotReader {
266 async fn on_key_value<O: Send + 'static>(
267 &self,
268 key: TableKey<Bytes>,
269 read_options: ReadOptions,
270 on_key_value_fn: impl KeyValueFn<O>,
271 ) -> StorageResult<Option<O>> {
272 let key = UserKey::new(self.table_id, key);
273 Self::get_flushed(
274 &self.hummock_version_reader,
275 &self.read_version,
276 key,
277 read_options,
278 on_key_value_fn,
279 )
280 .await
281 }
282}
283
284impl StateStoreRead for LocalHummockFlushedSnapshotReader {
285 type Iter = HummockStorageIterator;
286 type RevIter = HummockStorageRevIterator;
287
288 fn iter(
289 &self,
290 key_range: TableKeyRange,
291 read_options: ReadOptions,
292 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
293 self.iter_flushed(key_range, read_options)
294 .instrument(tracing::trace_span!("hummock_iter"))
295 }
296
297 fn rev_iter(
298 &self,
299 key_range: TableKeyRange,
300 read_options: ReadOptions,
301 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
302 self.rev_iter_flushed(key_range, read_options)
303 .instrument(tracing::trace_span!("hummock_rev_iter"))
304 }
305}
306
307impl StateStoreGet for LocalHummockStorage {
308 async fn on_key_value<O: Send + 'static>(
309 &self,
310 key: TableKey<Bytes>,
311 read_options: ReadOptions,
312 on_key_value_fn: impl KeyValueFn<O>,
313 ) -> StorageResult<Option<O>> {
314 let key = UserKey::new(self.table_id, key);
315 match self.mem_table.buffer.get(&key.table_key) {
316 None => {
317 LocalHummockFlushedSnapshotReader::get_flushed(
318 &self.hummock_version_reader,
319 &self.read_version,
320 key,
321 read_options,
322 on_key_value_fn,
323 )
324 .await
325 }
326 Some(op) => match op {
327 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
328 Some(on_key_value_fn(
329 FullKey::new_with_gap_epoch(
330 self.table_id,
331 key.table_key.to_ref(),
332 self.current_epoch_with_gap(),
333 ),
334 value.as_ref(),
335 )?)
336 }),
337 KeyOp::Delete(_) => Ok(None),
338 },
339 }
340 }
341}
342
343impl LocalStateStore for LocalHummockStorage {
344 type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
345 type Iter<'a> = LocalHummockStorageIterator<'a>;
346 type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
347
348 async fn iter(
349 &self,
350 key_range: TableKeyRange,
351 read_options: ReadOptions,
352 ) -> StorageResult<Self::Iter<'_>> {
353 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
354 assert_eq!(
355 r_vnode_exclusive - l_vnode_inclusive,
356 1,
357 "read range {:?} for table {} iter contains more than one vnode",
358 key_range,
359 self.table_id
360 );
361 self.iter_all(key_range.clone(), self.epoch(), read_options)
362 .await
363 }
364
365 async fn rev_iter(
366 &self,
367 key_range: TableKeyRange,
368 read_options: ReadOptions,
369 ) -> StorageResult<Self::RevIter<'_>> {
370 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
371 assert_eq!(
372 r_vnode_exclusive - l_vnode_inclusive,
373 1,
374 "read range {:?} for table {} iter contains more than one vnode",
375 key_range,
376 self.table_id
377 );
378 self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
379 .await
380 }
381
382 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
383 self.new_flushed_snapshot_reader_inner()
384 }
385
386 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
387 self.read_version.read().latest_watermark(vnode)
388 }
389
390 fn insert(
391 &mut self,
392 key: TableKey<Bytes>,
393 new_val: Bytes,
394 old_val: Option<Bytes>,
395 ) -> StorageResult<()> {
396 match old_val {
397 None => self.mem_table.insert(key, new_val)?,
398 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
399 };
400
401 Ok(())
402 }
403
404 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
405 self.mem_table.delete(key, old_val)?;
406
407 Ok(())
408 }
409
410 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
411 self.update_vnode_bitmap_impl(vnodes).await
412 }
413}
414
415impl StateStoreWriteEpochControl for LocalHummockStorage {
416 async fn flush(&mut self) -> StorageResult<usize> {
417 let buffer = self.mem_table.drain().into_parts();
418 let mut kv_pairs = Vec::with_capacity(buffer.len());
419 let mut old_values = if self.is_flush_old_value() {
420 Some(Vec::with_capacity(buffer.len()))
421 } else {
422 None
423 };
424 let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
425 Some(self.new_flushed_snapshot_reader_inner())
426 } else {
427 None
428 };
429 for (key, key_op) in buffer {
430 match key_op {
431 KeyOp::Insert(value) => {
435 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
436 do_insert_sanity_check(
437 &key,
438 &value,
439 sanity_check_reader,
440 self.table_option,
441 &self.op_consistency_level,
442 )
443 .await?;
444 }
445 kv_pairs.push((key, SharedBufferValue::Insert(value)));
446 if let Some(old_values) = &mut old_values {
447 old_values.push(Bytes::new());
448 }
449 }
450 KeyOp::Delete(old_value) => {
451 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
452 do_delete_sanity_check(
453 &key,
454 &old_value,
455 sanity_check_reader,
456 self.table_option,
457 &self.op_consistency_level,
458 )
459 .await?;
460 }
461 kv_pairs.push((key, SharedBufferValue::Delete));
462 if let Some(old_values) = &mut old_values {
463 old_values.push(old_value);
464 }
465 }
466 KeyOp::Update((old_value, new_value)) => {
467 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
468 do_update_sanity_check(
469 &key,
470 &old_value,
471 &new_value,
472 sanity_check_reader,
473 self.table_option,
474 &self.op_consistency_level,
475 )
476 .await?;
477 }
478 kv_pairs.push((key, SharedBufferValue::Update(new_value)));
479 if let Some(old_values) = &mut old_values {
480 old_values.push(old_value);
481 }
482 }
483 }
484 }
485 self.flush_inner(kv_pairs, old_values).await
486 }
487
488 async fn try_flush(&mut self) -> StorageResult<()> {
489 if self.mem_table_spill_threshold != 0
490 && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
491 {
492 if self.spill_offset < MAX_SPILL_TIMES {
493 let table_id_label = self.table_id.table_id().to_string();
494 self.flush().await?;
495 self.stats
496 .mem_table_spill_counts
497 .with_label_values(&[table_id_label.as_str()])
498 .inc();
499 } else {
500 tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
501 }
502 }
503
504 Ok(())
505 }
506
507 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
508 let epoch = options.epoch;
509 wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
510 assert_eq!(
511 self.epoch.replace(epoch),
512 None,
513 "local state store of table id {:?} is init for more than once",
514 self.table_id
515 );
516 if !self.is_replicated {
517 self.event_sender
518 .send(HummockEvent::InitEpoch {
519 instance_id: self.instance_id(),
520 init_epoch: options.epoch.curr,
521 })
522 .map_err(|_| {
523 HummockError::other("failed to send InitEpoch. maybe shutting down")
524 })?;
525 }
526 Ok(())
527 }
528
529 fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
530 assert!(!self.mem_table.is_dirty());
531 if let Some(new_level) = &opts.switch_op_consistency_level {
532 self.mem_table.op_consistency_level.update(new_level);
533 self.op_consistency_level.update(new_level);
534 }
535 let epoch = self
536 .epoch
537 .as_mut()
538 .expect("should have init epoch before seal the first epoch");
539 let prev_epoch = epoch.curr;
540 epoch.prev = prev_epoch;
541 epoch.curr = next_epoch;
542 self.spill_offset = 0;
543 assert!(
544 next_epoch > prev_epoch,
545 "new epoch {} should be greater than current epoch: {}",
546 next_epoch,
547 prev_epoch
548 );
549
550 if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
552 &mut opts.table_watermarks
553 {
554 let mut read_version = self.read_version.write();
555 read_version.filter_regress_watermarks(watermarks);
556 if !watermarks.is_empty() {
557 read_version.update(VersionUpdate::NewTableWatermark {
558 direction: *direction,
559 epoch: prev_epoch,
560 vnode_watermarks: watermarks.clone(),
561 watermark_type: WatermarkSerdeType::PkPrefix,
562 });
563 }
564 }
565
566 if !self.is_replicated
567 && self
568 .event_sender
569 .send(HummockEvent::LocalSealEpoch {
570 instance_id: self.instance_id(),
571 next_epoch,
572 opts,
573 })
574 .is_err()
575 {
576 warn!("failed to send LocalSealEpoch. maybe shutting down");
577 }
578 }
579}
580
581impl LocalHummockStorage {
582 async fn update_vnode_bitmap_impl(
583 &mut self,
584 vnodes: Arc<Bitmap>,
585 ) -> StorageResult<Arc<Bitmap>> {
586 wait_for_epoch(
587 &self.version_update_notifier_tx,
588 self.epoch.expect("should have init").prev,
589 self.table_id,
590 )
591 .await?;
592 assert!(!self.mem_table.is_dirty());
593 let mut read_version = self.read_version.write();
594 assert!(
595 read_version.staging().is_empty(),
596 "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
597 self.table_id(),
598 self.instance_id()
599 );
600 Ok(read_version.update_vnode_bitmap(vnodes))
601 }
602
603 fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
604 LocalHummockFlushedSnapshotReader {
605 table_id: self.table_id,
606 read_version: self.read_version.clone(),
607 hummock_version_reader: self.hummock_version_reader.clone(),
608 }
609 }
610
611 async fn flush_inner(
612 &mut self,
613 sorted_items: Vec<SharedBufferItem>,
614 old_values: Option<Vec<Bytes>>,
615 ) -> StorageResult<usize> {
616 let epoch = self.epoch();
617 let table_id = self.table_id;
618
619 let table_id_label = table_id.to_string();
620 self.stats
621 .write_batch_tuple_counts
622 .with_label_values(&[table_id_label.as_str()])
623 .inc_by(sorted_items.len() as _);
624 let timer = self
625 .stats
626 .write_batch_duration
627 .with_label_values(&[table_id_label.as_str()])
628 .start_timer();
629
630 let imm_size = if !sorted_items.is_empty() {
631 let (size, old_value_size) =
632 SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
633
634 self.write_limiter.wait_permission(self.table_id).await;
635 let limiter = self.memory_limiter.as_ref();
636 let tracker = match limiter.try_require_memory(size as u64) {
637 Some(tracker) => tracker,
638 _ => {
639 warn!(
640 "blocked at requiring memory: {}, current {}",
641 size,
642 limiter.get_memory_usage()
643 );
644 self.event_sender
645 .send(HummockEvent::BufferMayFlush)
646 .expect("should be able to send");
647 let tracker = limiter
648 .require_memory(size as u64)
649 .instrument_await("hummock_require_memory".verbose())
650 .await;
651 warn!(
652 "successfully requiring memory: {}, current {}",
653 size,
654 limiter.get_memory_usage()
655 );
656 tracker
657 }
658 };
659
660 let old_values = old_values.map(|old_values| {
661 SharedBufferBatchOldValues::new(
662 old_values,
663 old_value_size,
664 self.stats.old_value_size.clone(),
665 )
666 });
667
668 let instance_id = self.instance_guard.instance_id;
669 let imm = SharedBufferBatch::build_shared_buffer_batch(
670 epoch,
671 self.spill_offset,
672 sorted_items,
673 old_values,
674 size,
675 table_id,
676 Some(tracker),
677 );
678 self.spill_offset += 1;
679 let imm_size = imm.size();
680 self.read_version
681 .write()
682 .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
683
684 if !self.is_replicated {
686 self.event_sender
687 .send(HummockEvent::ImmToUploader { instance_id, imm })
688 .map_err(|_| {
689 HummockError::other("failed to send imm to uploader. maybe shutting down")
690 })?;
691 }
692 imm_size
693 } else {
694 0
695 };
696
697 timer.observe_duration();
698
699 self.stats
700 .write_batch_size
701 .with_label_values(&[table_id_label.as_str()])
702 .observe(imm_size as _);
703 Ok(imm_size)
704 }
705}
706
707impl LocalHummockStorage {
708 #[allow(clippy::too_many_arguments)]
709 pub fn new(
710 instance_guard: LocalInstanceGuard,
711 read_version: HummockReadVersionRef,
712 hummock_version_reader: HummockVersionReader,
713 event_sender: HummockEventSender,
714 memory_limiter: Arc<MemoryLimiter>,
715 write_limiter: WriteLimiterRef,
716 option: NewLocalOptions,
717 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
718 mem_table_spill_threshold: usize,
719 ) -> Self {
720 let stats = hummock_version_reader.stats().clone();
721 Self {
722 mem_table: MemTable::new(option.op_consistency_level.clone()),
723 spill_offset: 0,
724 epoch: None,
725 table_id: option.table_id,
726 op_consistency_level: option.op_consistency_level,
727 table_option: option.table_option,
728 is_replicated: option.is_replicated,
729 instance_guard,
730 read_version,
731 event_sender,
732 memory_limiter,
733 hummock_version_reader,
734 stats,
735 write_limiter,
736 version_update_notifier_tx,
737 mem_table_spill_threshold,
738 }
739 }
740
741 pub fn read_version(&self) -> HummockReadVersionRef {
743 self.read_version.clone()
744 }
745
746 pub fn table_id(&self) -> TableId {
747 self.instance_guard.table_id
748 }
749
750 pub fn instance_id(&self) -> u64 {
751 self.instance_guard.instance_id
752 }
753
754 fn is_flush_old_value(&self) -> bool {
755 matches!(
756 &self.op_consistency_level,
757 OpConsistencyLevel::ConsistentOldValue {
758 is_log_store: true,
759 ..
760 }
761 )
762 }
763}
764
765pub type StagingDataIterator = MergeIterator<
766 HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
767>;
768pub type StagingDataRevIterator = MergeIterator<
769 HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
770>;
771pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
772 HummockIteratorUnion<
773 Forward,
774 StagingDataIterator,
775 SstableIterator,
776 ConcatIteratorInner<SstableIterator>,
777 MemTableHummockIterator<'a>,
778 >,
779>;
780
781pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
782 HummockIteratorUnion<
783 Backward,
784 StagingDataRevIterator,
785 BackwardSstableIterator,
786 ConcatIteratorInner<BackwardSstableIterator>,
787 MemTableHummockRevIterator<'a>,
788 >,
789>;
790
791pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
792pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
793pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
794pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
795
796pub struct HummockStorageIteratorInner<'a> {
797 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
798 initial_read: bool,
799 stats_guard: IterLocalMetricsGuard,
800}
801
802impl StateStoreIter for HummockStorageIteratorInner<'_> {
803 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
804 let iter = &mut self.inner;
805 if !self.initial_read {
806 self.initial_read = true;
807 } else {
808 iter.next().await?;
809 }
810
811 if iter.is_valid() {
812 Ok(Some((iter.key(), iter.value())))
813 } else {
814 Ok(None)
815 }
816 }
817}
818
819impl<'a> HummockStorageIteratorInner<'a> {
820 pub fn new(
821 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
822 metrics: Arc<HummockStateStoreMetrics>,
823 table_id: TableId,
824 mut local_stats: StoreLocalStatistic,
825 ) -> Self {
826 local_stats.found_key = inner.is_valid();
827 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
828 + local_stats.staging_sst_iter_count
829 + local_stats.overlapping_iter_count
830 + local_stats.non_overlapping_iter_count;
831 Self {
832 inner,
833 initial_read: false,
834 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
835 }
836 }
837}
838
839impl Drop for HummockStorageIteratorInner<'_> {
840 fn drop(&mut self) {
841 self.inner
842 .collect_local_statistic(&mut self.stats_guard.local_stats);
843 }
844}
845
846#[derive(Default)]
847pub struct ForwardIteratorFactory {
848 non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
849 overlapping_iters: Vec<SstableIterator>,
850 staging_iters:
851 Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
852}
853
854impl ForwardIteratorFactory {
855 pub fn build(
856 self,
857 mem_table: Option<MemTableHummockIterator<'_>>,
858 ) -> HummockStorageIteratorPayloadInner<'_> {
859 let staging_iter = StagingDataIterator::new(self.staging_iters);
861 MergeIterator::new(
862 once(HummockIteratorUnion::First(staging_iter))
863 .chain(
864 self.overlapping_iters
865 .into_iter()
866 .map(HummockIteratorUnion::Second),
867 )
868 .chain(
869 self.non_overlapping_iters
870 .into_iter()
871 .map(HummockIteratorUnion::Third),
872 )
873 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
874 )
875 }
876}
877
878pub struct HummockStorageRevIteratorInner<'a> {
879 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
880 initial_read: bool,
881 stats_guard: IterLocalMetricsGuard,
882}
883
884impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
885 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
886 let iter = &mut self.inner;
887 if !self.initial_read {
888 self.initial_read = true;
889 } else {
890 iter.next().await?;
891 }
892
893 if iter.is_valid() {
894 Ok(Some((iter.key(), iter.value())))
895 } else {
896 Ok(None)
897 }
898 }
899}
900
901impl<'a> HummockStorageRevIteratorInner<'a> {
902 pub fn new(
903 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
904 metrics: Arc<HummockStateStoreMetrics>,
905 table_id: TableId,
906 mut local_stats: StoreLocalStatistic,
907 ) -> Self {
908 local_stats.found_key = inner.is_valid();
909 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
910 + local_stats.staging_sst_iter_count
911 + local_stats.overlapping_iter_count
912 + local_stats.non_overlapping_iter_count;
913 Self {
914 inner,
915 initial_read: false,
916 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
917 }
918 }
919}
920
921impl Drop for HummockStorageRevIteratorInner<'_> {
922 fn drop(&mut self) {
923 self.inner
924 .collect_local_statistic(&mut self.stats_guard.local_stats);
925 }
926}
927
928impl IteratorFactory for ForwardIteratorFactory {
929 type Direction = Forward;
930 type SstableIteratorType = SstableIterator;
931
932 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
933 self.staging_iters
934 .push(HummockIteratorUnion::First(batch.into_forward_iter()));
935 }
936
937 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
938 self.staging_iters.push(HummockIteratorUnion::Second(iter));
939 }
940
941 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
942 self.overlapping_iters.push(iter);
943 }
944
945 fn add_concat_sst_iter(
946 &mut self,
947 tables: Vec<SstableInfo>,
948 sstable_store: SstableStoreRef,
949 read_options: Arc<SstableIteratorReadOptions>,
950 ) {
951 self.non_overlapping_iters
952 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
953 tables,
954 sstable_store,
955 read_options,
956 ));
957 }
958}
959
960#[derive(Default)]
961pub struct BackwardIteratorFactory {
962 non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
963 overlapping_iters: Vec<BackwardSstableIterator>,
964 staging_iters: Vec<
965 HummockIteratorUnion<
966 Backward,
967 SharedBufferBatchIterator<Backward>,
968 BackwardSstableIterator,
969 >,
970 >,
971}
972
973impl BackwardIteratorFactory {
974 pub fn build(
975 self,
976 mem_table: Option<MemTableHummockRevIterator<'_>>,
977 ) -> StorageRevIteratorPayloadInner<'_> {
978 let staging_iter = StagingDataRevIterator::new(self.staging_iters);
980 MergeIterator::new(
981 once(HummockIteratorUnion::First(staging_iter))
982 .chain(
983 self.overlapping_iters
984 .into_iter()
985 .map(HummockIteratorUnion::Second),
986 )
987 .chain(
988 self.non_overlapping_iters
989 .into_iter()
990 .map(HummockIteratorUnion::Third),
991 )
992 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
993 )
994 }
995}
996
997impl IteratorFactory for BackwardIteratorFactory {
998 type Direction = Backward;
999 type SstableIteratorType = BackwardSstableIterator;
1000
1001 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1002 self.staging_iters
1003 .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1004 }
1005
1006 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1007 self.staging_iters.push(HummockIteratorUnion::Second(iter));
1008 }
1009
1010 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1011 self.overlapping_iters.push(iter);
1012 }
1013
1014 fn add_concat_sst_iter(
1015 &mut self,
1016 mut tables: Vec<SstableInfo>,
1017 sstable_store: SstableStoreRef,
1018 read_options: Arc<SstableIteratorReadOptions>,
1019 ) {
1020 tables.reverse();
1021 self.non_overlapping_iters
1022 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1023 tables,
1024 sstable_store,
1025 read_options,
1026 ));
1027 }
1028}