1use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28 FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::VersionUpdate;
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39 Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40 IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44 SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45 SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50 wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54 BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55 SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61pub struct LocalHummockStorage {
64 mem_table: MemTable,
65
66 spill_offset: u16,
67 epoch: Option<EpochPair>,
68
69 table_id: TableId,
70 op_consistency_level: OpConsistencyLevel,
71 table_option: TableOption,
72
73 instance_guard: LocalInstanceGuard,
74
75 read_version: HummockReadVersionRef,
77
78 is_replicated: bool,
89
90 upload_on_flush: bool,
92
93 event_sender: HummockEventSender,
95
96 memory_limiter: Arc<MemoryLimiter>,
97
98 hummock_version_reader: HummockVersionReader,
99
100 stats: Arc<HummockStateStoreMetrics>,
101
102 write_limiter: WriteLimiterRef,
103
104 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106 mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110 async fn get_flushed<'a, O>(
111 hummock_version_reader: &'a HummockVersionReader,
112 read_version: &HummockReadVersionRef,
113 user_key: UserKey<Bytes>,
114 read_options: ReadOptions,
115 on_key_value_fn: impl KeyValueFn<'a, O>,
116 ) -> StorageResult<Option<O>> {
117 let table_key_range = (
118 Bound::Included(user_key.table_key.clone()),
119 Bound::Included(user_key.table_key.clone()),
120 );
121
122 let (table_key_range, read_snapshot) =
123 read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
124
125 if is_empty_key_range(&table_key_range) {
126 return Ok(None);
127 }
128
129 hummock_version_reader
130 .get(
131 user_key.table_key,
132 MAX_EPOCH,
133 user_key.table_id,
134 read_options,
135 read_snapshot,
136 on_key_value_fn,
137 )
138 .await
139 }
140
141 async fn iter_flushed(
142 &self,
143 table_key_range: TableKeyRange,
144 read_options: ReadOptions,
145 ) -> StorageResult<HummockStorageIterator> {
146 let (table_key_range, read_snapshot) = read_filter_for_version(
147 MAX_EPOCH,
148 self.table_id,
149 table_key_range,
150 &self.read_version,
151 )?;
152
153 let table_key_range = table_key_range;
154
155 self.hummock_version_reader
156 .iter(
157 table_key_range,
158 MAX_EPOCH,
159 self.table_id,
160 read_options,
161 read_snapshot,
162 )
163 .await
164 }
165
166 async fn rev_iter_flushed(
167 &self,
168 table_key_range: TableKeyRange,
169 read_options: ReadOptions,
170 ) -> StorageResult<HummockStorageRevIterator> {
171 let (table_key_range, read_snapshot) = read_filter_for_version(
172 MAX_EPOCH,
173 self.table_id,
174 table_key_range,
175 &self.read_version,
176 )?;
177
178 let table_key_range = table_key_range;
179
180 self.hummock_version_reader
181 .rev_iter(
182 table_key_range,
183 MAX_EPOCH,
184 self.table_id,
185 read_options,
186 read_snapshot,
187 None,
188 )
189 .await
190 }
191}
192
193impl LocalHummockStorage {
194 fn epoch(&self) -> u64 {
195 self.epoch.expect("should have set the epoch").curr
196 }
197
198 fn current_epoch_with_gap(&self) -> EpochWithGap {
199 EpochWithGap::new(self.epoch(), self.spill_offset)
200 }
201
202 fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
203 MemTableHummockIterator::new(
204 &self.mem_table.buffer,
205 self.current_epoch_with_gap(),
206 self.table_id,
207 )
208 }
209
210 fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
211 MemTableHummockRevIterator::new(
212 &self.mem_table.buffer,
213 self.current_epoch_with_gap(),
214 self.table_id,
215 )
216 }
217
218 async fn iter_all(
219 &self,
220 table_key_range: TableKeyRange,
221 epoch: u64,
222 read_options: ReadOptions,
223 ) -> StorageResult<LocalHummockStorageIterator<'_>> {
224 let (table_key_range, read_snapshot) =
225 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
226
227 self.hummock_version_reader
228 .iter_with_memtable(
229 table_key_range,
230 epoch,
231 self.table_id,
232 read_options,
233 read_snapshot,
234 Some(self.mem_table_iter()),
235 )
236 .await
237 }
238
239 async fn rev_iter_all(
240 &self,
241 table_key_range: TableKeyRange,
242 epoch: u64,
243 read_options: ReadOptions,
244 ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245 let (table_key_range, read_snapshot) =
246 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248 self.hummock_version_reader
249 .rev_iter(
250 table_key_range,
251 epoch,
252 self.table_id,
253 read_options,
254 read_snapshot,
255 Some(self.mem_table_rev_iter()),
256 )
257 .await
258 }
259}
260
261#[derive(Clone)]
262pub struct LocalHummockFlushedSnapshotReader {
263 table_id: TableId,
264 read_version: HummockReadVersionRef,
265 hummock_version_reader: HummockVersionReader,
266}
267
268impl StateStoreGet for LocalHummockFlushedSnapshotReader {
269 async fn on_key_value<'a, O: Send + 'a>(
270 &'a self,
271 key: TableKey<Bytes>,
272 read_options: ReadOptions,
273 on_key_value_fn: impl KeyValueFn<'a, O>,
274 ) -> StorageResult<Option<O>> {
275 let key = UserKey::new(self.table_id, key);
276 Self::get_flushed(
277 &self.hummock_version_reader,
278 &self.read_version,
279 key,
280 read_options,
281 on_key_value_fn,
282 )
283 .await
284 }
285}
286
287impl StateStoreRead for LocalHummockFlushedSnapshotReader {
288 type Iter = HummockStorageIterator;
289 type RevIter = HummockStorageRevIterator;
290
291 fn iter(
292 &self,
293 key_range: TableKeyRange,
294 read_options: ReadOptions,
295 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
296 self.iter_flushed(key_range, read_options)
297 .instrument(tracing::trace_span!("hummock_iter"))
298 }
299
300 fn rev_iter(
301 &self,
302 key_range: TableKeyRange,
303 read_options: ReadOptions,
304 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
305 self.rev_iter_flushed(key_range, read_options)
306 .instrument(tracing::trace_span!("hummock_rev_iter"))
307 }
308}
309
310impl StateStoreGet for LocalHummockStorage {
311 async fn on_key_value<'a, O: Send + 'a>(
312 &'a self,
313 key: TableKey<Bytes>,
314 read_options: ReadOptions,
315 on_key_value_fn: impl KeyValueFn<'a, O>,
316 ) -> StorageResult<Option<O>> {
317 let key = UserKey::new(self.table_id, key);
318 match self.mem_table.buffer.get(&key.table_key) {
319 None => {
320 LocalHummockFlushedSnapshotReader::get_flushed(
321 &self.hummock_version_reader,
322 &self.read_version,
323 key,
324 read_options,
325 on_key_value_fn,
326 )
327 .await
328 }
329 Some(op) => match op {
330 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
331 Some(on_key_value_fn(
332 FullKey::new_with_gap_epoch(
333 self.table_id,
334 key.table_key.to_ref(),
335 self.current_epoch_with_gap(),
336 ),
337 value.as_ref(),
338 )?)
339 }),
340 KeyOp::Delete(_) => Ok(None),
341 },
342 }
343 }
344}
345
346impl LocalStateStore for LocalHummockStorage {
347 type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
348 type Iter<'a> = LocalHummockStorageIterator<'a>;
349 type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
350
351 async fn iter(
352 &self,
353 key_range: TableKeyRange,
354 read_options: ReadOptions,
355 ) -> StorageResult<Self::Iter<'_>> {
356 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
357 assert_eq!(
358 r_vnode_exclusive - l_vnode_inclusive,
359 1,
360 "read range {:?} for table {} iter contains more than one vnode",
361 key_range,
362 self.table_id
363 );
364 self.iter_all(key_range.clone(), self.epoch(), read_options)
365 .await
366 }
367
368 async fn rev_iter(
369 &self,
370 key_range: TableKeyRange,
371 read_options: ReadOptions,
372 ) -> StorageResult<Self::RevIter<'_>> {
373 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
374 assert_eq!(
375 r_vnode_exclusive - l_vnode_inclusive,
376 1,
377 "read range {:?} for table {} iter contains more than one vnode",
378 key_range,
379 self.table_id
380 );
381 self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
382 .await
383 }
384
385 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
386 self.new_flushed_snapshot_reader_inner()
387 }
388
389 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
390 self.read_version.read().latest_watermark(vnode)
391 }
392
393 fn insert(
394 &mut self,
395 key: TableKey<Bytes>,
396 new_val: Bytes,
397 old_val: Option<Bytes>,
398 ) -> StorageResult<()> {
399 match old_val {
400 None => self.mem_table.insert(key, new_val)?,
401 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
402 };
403
404 Ok(())
405 }
406
407 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
408 self.mem_table.delete(key, old_val)?;
409
410 Ok(())
411 }
412
413 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
414 self.update_vnode_bitmap_impl(vnodes).await
415 }
416}
417
418impl StateStoreWriteEpochControl for LocalHummockStorage {
419 async fn flush(&mut self) -> StorageResult<usize> {
420 let buffer = self.mem_table.drain().into_parts();
421 let mut kv_pairs = Vec::with_capacity(buffer.len());
422 let mut old_values = if self.is_flush_old_value() {
423 Some(Vec::with_capacity(buffer.len()))
424 } else {
425 None
426 };
427 let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
428 Some(self.new_flushed_snapshot_reader_inner())
429 } else {
430 None
431 };
432 for (key, key_op) in buffer {
433 match key_op {
434 KeyOp::Insert(value) => {
438 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
439 do_insert_sanity_check(
440 self.table_id,
441 &key,
442 &value,
443 sanity_check_reader,
444 self.table_option,
445 &self.op_consistency_level,
446 )
447 .await?;
448 }
449 kv_pairs.push((key, SharedBufferValue::Insert(value)));
450 if let Some(old_values) = &mut old_values {
451 old_values.push(Bytes::new());
452 }
453 }
454 KeyOp::Delete(old_value) => {
455 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
456 do_delete_sanity_check(
457 self.table_id,
458 &key,
459 &old_value,
460 sanity_check_reader,
461 self.table_option,
462 &self.op_consistency_level,
463 )
464 .await?;
465 }
466 kv_pairs.push((key, SharedBufferValue::Delete));
467 if let Some(old_values) = &mut old_values {
468 old_values.push(old_value);
469 }
470 }
471 KeyOp::Update((old_value, new_value)) => {
472 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
473 do_update_sanity_check(
474 self.table_id,
475 &key,
476 &old_value,
477 &new_value,
478 sanity_check_reader,
479 self.table_option,
480 &self.op_consistency_level,
481 )
482 .await?;
483 }
484 kv_pairs.push((key, SharedBufferValue::Update(new_value)));
485 if let Some(old_values) = &mut old_values {
486 old_values.push(old_value);
487 }
488 }
489 }
490 }
491 self.flush_inner(kv_pairs, old_values).await
492 }
493
494 async fn try_flush(&mut self) -> StorageResult<()> {
495 if self.mem_table_spill_threshold != 0
496 && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
497 {
498 if self.spill_offset < MAX_SPILL_TIMES {
499 let table_id_label = self.table_id.to_string();
500 self.flush().await?;
501 self.stats
502 .mem_table_spill_counts
503 .with_label_values(&[table_id_label.as_str()])
504 .inc();
505 } else {
506 tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
507 }
508 }
509
510 Ok(())
511 }
512
513 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
514 let epoch = options.epoch;
515 wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
516 assert_eq!(
517 self.epoch.replace(epoch),
518 None,
519 "local state store of table id {:?} is init for more than once",
520 self.table_id
521 );
522 self.read_version.write().init();
523 if !self.is_replicated {
524 self.event_sender
525 .send(HummockEvent::InitEpoch {
526 instance_id: self.instance_id(),
527 init_epoch: options.epoch.curr,
528 })
529 .map_err(|_| {
530 HummockError::other("failed to send InitEpoch. maybe shutting down")
531 })?;
532 }
533 Ok(())
534 }
535
536 fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
537 assert!(!self.mem_table.is_dirty());
538 if !self.is_replicated {
539 if self.upload_on_flush {
540 debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
541 } else {
542 let pending_imms = self.read_version.write().start_upload_pending_imms();
543 if !pending_imms.is_empty()
544 && self
545 .event_sender
546 .send(HummockEvent::ImmToUploader {
547 instance_id: self.instance_id(),
548 imms: pending_imms,
549 })
550 .is_err()
551 {
552 warn!("failed to send ImmToUploader during seal. maybe shutting down");
553 }
554 }
555 }
556
557 if let Some(new_level) = &opts.switch_op_consistency_level {
558 self.mem_table.op_consistency_level.update(new_level);
559 self.op_consistency_level.update(new_level);
560 }
561 let epoch = self
562 .epoch
563 .as_mut()
564 .expect("should have init epoch before seal the first epoch");
565 let prev_epoch = epoch.curr;
566 epoch.prev = prev_epoch;
567 epoch.curr = next_epoch;
568 self.spill_offset = 0;
569 assert!(
570 next_epoch > prev_epoch,
571 "new epoch {} should be greater than current epoch: {}",
572 next_epoch,
573 prev_epoch
574 );
575
576 if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
578 &mut opts.table_watermarks
579 {
580 let mut read_version = self.read_version.write();
581 read_version.filter_regress_watermarks(watermarks);
582 if !watermarks.is_empty() {
583 read_version.update(VersionUpdate::NewTableWatermark {
584 direction: *direction,
585 epoch: prev_epoch,
586 vnode_watermarks: watermarks.clone(),
587 watermark_type: WatermarkSerdeType::PkPrefix,
588 });
589 }
590 }
591
592 if !self.is_replicated
593 && self
594 .event_sender
595 .send(HummockEvent::LocalSealEpoch {
596 instance_id: self.instance_id(),
597 next_epoch,
598 opts,
599 })
600 .is_err()
601 {
602 warn!("failed to send LocalSealEpoch. maybe shutting down");
603 }
604 }
605}
606
607impl LocalHummockStorage {
608 async fn update_vnode_bitmap_impl(
609 &mut self,
610 vnodes: Arc<Bitmap>,
611 ) -> StorageResult<Arc<Bitmap>> {
612 wait_for_epoch(
613 &self.version_update_notifier_tx,
614 self.epoch.expect("should have init").prev,
615 self.table_id,
616 )
617 .await?;
618 assert!(!self.mem_table.is_dirty());
619 let mut read_version = self.read_version.write();
620 assert!(
621 read_version.staging().is_empty(),
622 "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
623 self.table_id(),
624 self.instance_id()
625 );
626 Ok(read_version.update_vnode_bitmap(vnodes))
627 }
628
629 fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
630 LocalHummockFlushedSnapshotReader {
631 table_id: self.table_id,
632 read_version: self.read_version.clone(),
633 hummock_version_reader: self.hummock_version_reader.clone(),
634 }
635 }
636
637 async fn flush_inner(
638 &mut self,
639 sorted_items: Vec<SharedBufferItem>,
640 old_values: Option<Vec<Bytes>>,
641 ) -> StorageResult<usize> {
642 let epoch = self.epoch();
643 let table_id = self.table_id;
644
645 let table_id_label = table_id.to_string();
646 self.stats
647 .write_batch_tuple_counts
648 .with_label_values(&[table_id_label.as_str()])
649 .inc_by(sorted_items.len() as _);
650 let timer = self
651 .stats
652 .write_batch_duration
653 .with_label_values(&[table_id_label.as_str()])
654 .start_timer();
655
656 let imm_size = if !sorted_items.is_empty() {
657 let (size, old_value_size) =
658 SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
659
660 self.write_limiter.wait_permission(self.table_id).await;
661 let limiter = self.memory_limiter.as_ref();
662 let tracker = match limiter.try_require_memory(size as u64) {
663 Some(tracker) => tracker,
664 _ => {
665 warn!(
666 "blocked at requiring memory: {}, current {}",
667 size,
668 limiter.get_memory_usage()
669 );
670 self.event_sender
671 .send(HummockEvent::BufferMayFlush)
672 .expect("should be able to send");
673 let tracker = limiter
674 .require_memory(size as u64)
675 .instrument_await("hummock_require_memory".verbose())
676 .await;
677 warn!(
678 "successfully requiring memory: {}, current {}",
679 size,
680 limiter.get_memory_usage()
681 );
682 tracker
683 }
684 };
685
686 let old_values = old_values.map(|old_values| {
687 SharedBufferBatchOldValues::new(
688 old_values,
689 old_value_size,
690 self.stats.old_value_size.clone(),
691 )
692 });
693
694 let instance_id = self.instance_guard.instance_id;
695 let imm = SharedBufferBatch::build_shared_buffer_batch(
696 epoch,
697 self.spill_offset,
698 sorted_items,
699 old_values,
700 size,
701 table_id,
702 Some(tracker),
703 );
704 self.spill_offset += 1;
705 let imm_size = imm.size();
706 let mut read_version = self.read_version.write();
707 read_version.add_imm(imm);
708
709 if !self.is_replicated
711 && (self.upload_on_flush
712 || read_version.pending_imm_size() >= self.mem_table_spill_threshold)
713 {
714 let imms = read_version.start_upload_pending_imms();
715 self.event_sender
716 .send(HummockEvent::ImmToUploader { instance_id, imms })
717 .map_err(|_| {
718 HummockError::other("failed to send imm to uploader. maybe shutting down")
719 })?;
720 }
721 imm_size
722 } else {
723 0
724 };
725
726 timer.observe_duration();
727
728 self.stats
729 .write_batch_size
730 .with_label_values(&[table_id_label.as_str()])
731 .observe(imm_size as _);
732 Ok(imm_size)
733 }
734}
735
736impl LocalHummockStorage {
737 #[allow(clippy::too_many_arguments)]
738 pub fn new(
739 instance_guard: LocalInstanceGuard,
740 read_version: HummockReadVersionRef,
741 hummock_version_reader: HummockVersionReader,
742 event_sender: HummockEventSender,
743 memory_limiter: Arc<MemoryLimiter>,
744 write_limiter: WriteLimiterRef,
745 option: NewLocalOptions,
746 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
747 mem_table_spill_threshold: usize,
748 ) -> Self {
749 let stats = hummock_version_reader.stats().clone();
750 Self {
751 mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
752 spill_offset: 0,
753 epoch: None,
754 table_id: option.table_id,
755 op_consistency_level: option.op_consistency_level,
756 table_option: option.table_option,
757 is_replicated: option.is_replicated,
758 instance_guard,
759 read_version,
760 event_sender,
761 memory_limiter,
762 hummock_version_reader,
763 stats,
764 write_limiter,
765 version_update_notifier_tx,
766 mem_table_spill_threshold,
767 upload_on_flush: option.upload_on_flush,
768 }
769 }
770
771 pub fn read_version(&self) -> HummockReadVersionRef {
773 self.read_version.clone()
774 }
775
776 pub fn table_id(&self) -> TableId {
777 self.instance_guard.table_id
778 }
779
780 pub fn instance_id(&self) -> u64 {
781 self.instance_guard.instance_id
782 }
783
784 fn is_flush_old_value(&self) -> bool {
785 matches!(
786 &self.op_consistency_level,
787 OpConsistencyLevel::ConsistentOldValue {
788 is_log_store: true,
789 ..
790 }
791 )
792 }
793}
794
795pub type StagingDataIterator = MergeIterator<
796 HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
797>;
798pub type StagingDataRevIterator = MergeIterator<
799 HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
800>;
801pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
802 HummockIteratorUnion<
803 Forward,
804 StagingDataIterator,
805 SstableIterator,
806 ConcatIteratorInner<SstableIterator>,
807 MemTableHummockIterator<'a>,
808 >,
809>;
810
811pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
812 HummockIteratorUnion<
813 Backward,
814 StagingDataRevIterator,
815 BackwardSstableIterator,
816 ConcatIteratorInner<BackwardSstableIterator>,
817 MemTableHummockRevIterator<'a>,
818 >,
819>;
820
821pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
822pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
823pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
824pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
825
826pub struct HummockStorageIteratorInner<'a> {
827 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
828 initial_read: bool,
829 stats_guard: IterLocalMetricsGuard,
830}
831
832impl StateStoreIter for HummockStorageIteratorInner<'_> {
833 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
834 let iter = &mut self.inner;
835 if !self.initial_read {
836 self.initial_read = true;
837 } else {
838 iter.next().await?;
839 }
840
841 if iter.is_valid() {
842 Ok(Some((iter.key(), iter.value())))
843 } else {
844 Ok(None)
845 }
846 }
847}
848
849impl<'a> HummockStorageIteratorInner<'a> {
850 pub fn new(
851 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
852 metrics: Arc<HummockStateStoreMetrics>,
853 table_id: TableId,
854 mut local_stats: StoreLocalStatistic,
855 ) -> Self {
856 local_stats.found_key = inner.is_valid();
857 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
858 + local_stats.staging_sst_iter_count
859 + local_stats.overlapping_iter_count
860 + local_stats.non_overlapping_iter_count;
861 Self {
862 inner,
863 initial_read: false,
864 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
865 }
866 }
867}
868
869impl Drop for HummockStorageIteratorInner<'_> {
870 fn drop(&mut self) {
871 self.inner
872 .collect_local_statistic(&mut self.stats_guard.local_stats);
873 }
874}
875
876#[derive(Default)]
877pub struct ForwardIteratorFactory {
878 non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
879 overlapping_iters: Vec<SstableIterator>,
880 staging_iters:
881 Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
882}
883
884impl ForwardIteratorFactory {
885 pub fn build(
886 self,
887 mem_table: Option<MemTableHummockIterator<'_>>,
888 ) -> HummockStorageIteratorPayloadInner<'_> {
889 let staging_iter = StagingDataIterator::new(self.staging_iters);
891 MergeIterator::new(
892 once(HummockIteratorUnion::First(staging_iter))
893 .chain(
894 self.overlapping_iters
895 .into_iter()
896 .map(HummockIteratorUnion::Second),
897 )
898 .chain(
899 self.non_overlapping_iters
900 .into_iter()
901 .map(HummockIteratorUnion::Third),
902 )
903 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
904 )
905 }
906}
907
908pub struct HummockStorageRevIteratorInner<'a> {
909 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
910 initial_read: bool,
911 stats_guard: IterLocalMetricsGuard,
912}
913
914impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
915 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
916 let iter = &mut self.inner;
917 if !self.initial_read {
918 self.initial_read = true;
919 } else {
920 iter.next().await?;
921 }
922
923 if iter.is_valid() {
924 Ok(Some((iter.key(), iter.value())))
925 } else {
926 Ok(None)
927 }
928 }
929}
930
931impl<'a> HummockStorageRevIteratorInner<'a> {
932 pub fn new(
933 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
934 metrics: Arc<HummockStateStoreMetrics>,
935 table_id: TableId,
936 mut local_stats: StoreLocalStatistic,
937 ) -> Self {
938 local_stats.found_key = inner.is_valid();
939 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
940 + local_stats.staging_sst_iter_count
941 + local_stats.overlapping_iter_count
942 + local_stats.non_overlapping_iter_count;
943 Self {
944 inner,
945 initial_read: false,
946 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
947 }
948 }
949}
950
951impl Drop for HummockStorageRevIteratorInner<'_> {
952 fn drop(&mut self) {
953 self.inner
954 .collect_local_statistic(&mut self.stats_guard.local_stats);
955 }
956}
957
958impl IteratorFactory for ForwardIteratorFactory {
959 type Direction = Forward;
960 type SstableIteratorType = SstableIterator;
961
962 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
963 self.staging_iters
964 .push(HummockIteratorUnion::First(batch.into_forward_iter()));
965 }
966
967 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
968 self.staging_iters.push(HummockIteratorUnion::Second(iter));
969 }
970
971 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
972 self.overlapping_iters.push(iter);
973 }
974
975 fn add_concat_sst_iter(
976 &mut self,
977 tables: Vec<SstableInfo>,
978 sstable_store: SstableStoreRef,
979 read_options: Arc<SstableIteratorReadOptions>,
980 ) {
981 self.non_overlapping_iters
982 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
983 tables,
984 sstable_store,
985 read_options,
986 ));
987 }
988}
989
990#[derive(Default)]
991pub struct BackwardIteratorFactory {
992 non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
993 overlapping_iters: Vec<BackwardSstableIterator>,
994 staging_iters: Vec<
995 HummockIteratorUnion<
996 Backward,
997 SharedBufferBatchIterator<Backward>,
998 BackwardSstableIterator,
999 >,
1000 >,
1001}
1002
1003impl BackwardIteratorFactory {
1004 pub fn build(
1005 self,
1006 mem_table: Option<MemTableHummockRevIterator<'_>>,
1007 ) -> StorageRevIteratorPayloadInner<'_> {
1008 let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1010 MergeIterator::new(
1011 once(HummockIteratorUnion::First(staging_iter))
1012 .chain(
1013 self.overlapping_iters
1014 .into_iter()
1015 .map(HummockIteratorUnion::Second),
1016 )
1017 .chain(
1018 self.non_overlapping_iters
1019 .into_iter()
1020 .map(HummockIteratorUnion::Third),
1021 )
1022 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1023 )
1024 }
1025}
1026
1027impl IteratorFactory for BackwardIteratorFactory {
1028 type Direction = Backward;
1029 type SstableIteratorType = BackwardSstableIterator;
1030
1031 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1032 self.staging_iters
1033 .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1034 }
1035
1036 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1037 self.staging_iters.push(HummockIteratorUnion::Second(iter));
1038 }
1039
1040 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1041 self.overlapping_iters.push(iter);
1042 }
1043
1044 fn add_concat_sst_iter(
1045 &mut self,
1046 mut tables: Vec<SstableInfo>,
1047 sstable_store: SstableStoreRef,
1048 read_options: Arc<SstableIteratorReadOptions>,
1049 ) {
1050 tables.reverse();
1051 self.non_overlapping_iters
1052 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1053 tables,
1054 sstable_store,
1055 read_options,
1056 ));
1057 }
1058}