1use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{TableKey, TableKeyRange, is_empty_key_range, vnode_range};
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
30use tracing::{Instrument, warn};
31
32use super::version::{StagingData, VersionUpdate};
33use crate::error::StorageResult;
34use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
35use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
36use crate::hummock::iterator::{
37 Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
38 IteratorFactory, MergeIterator, UserIterator,
39};
40use crate::hummock::local_version::pinned_version::PinnedVersion;
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42 SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
43 SharedBufferValue,
44};
45use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
46use crate::hummock::utils::{
47 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
48 wait_for_epoch,
49};
50use crate::hummock::write_limiter::WriteLimiterRef;
51use crate::hummock::{
52 BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
53 SstableIteratorReadOptions, SstableStoreRef,
54};
55use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
56use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
57use crate::store::*;
58
59pub struct LocalHummockStorage {
62 mem_table: MemTable,
63
64 spill_offset: u16,
65 epoch: Option<EpochPair>,
66
67 table_id: TableId,
68 op_consistency_level: OpConsistencyLevel,
69 table_option: TableOption,
70
71 instance_guard: LocalInstanceGuard,
72
73 read_version: HummockReadVersionRef,
75
76 is_replicated: bool,
87
88 event_sender: HummockEventSender,
90
91 memory_limiter: Arc<MemoryLimiter>,
92
93 hummock_version_reader: HummockVersionReader,
94
95 stats: Arc<HummockStateStoreMetrics>,
96
97 write_limiter: WriteLimiterRef,
98
99 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
100
101 mem_table_spill_threshold: usize,
102}
103
104impl LocalHummockFlushedSnapshotReader {
105 async fn get_flushed(
106 hummock_version_reader: &HummockVersionReader,
107 read_version: &HummockReadVersionRef,
108 table_key: TableKey<Bytes>,
109 read_options: ReadOptions,
110 ) -> StorageResult<Option<StateStoreKeyedRow>> {
111 let table_key_range = (
112 Bound::Included(table_key.clone()),
113 Bound::Included(table_key.clone()),
114 );
115
116 let (table_key_range, read_snapshot) = read_filter_for_version(
117 MAX_EPOCH,
118 read_options.table_id,
119 table_key_range,
120 read_version,
121 )?;
122
123 if is_empty_key_range(&table_key_range) {
124 return Ok(None);
125 }
126
127 hummock_version_reader
128 .get(table_key, MAX_EPOCH, read_options, read_snapshot)
129 .await
130 }
131
132 async fn iter_flushed(
133 &self,
134 table_key_range: TableKeyRange,
135 read_options: ReadOptions,
136 ) -> StorageResult<HummockStorageIterator> {
137 let (table_key_range, read_snapshot) = read_filter_for_version(
138 MAX_EPOCH,
139 read_options.table_id,
140 table_key_range,
141 &self.read_version,
142 )?;
143
144 let table_key_range = table_key_range;
145
146 self.hummock_version_reader
147 .iter(table_key_range, MAX_EPOCH, read_options, read_snapshot)
148 .await
149 }
150
151 async fn rev_iter_flushed(
152 &self,
153 table_key_range: TableKeyRange,
154 read_options: ReadOptions,
155 ) -> StorageResult<HummockStorageRevIterator> {
156 let (table_key_range, read_snapshot) = read_filter_for_version(
157 MAX_EPOCH,
158 read_options.table_id,
159 table_key_range,
160 &self.read_version,
161 )?;
162
163 let table_key_range = table_key_range;
164
165 self.hummock_version_reader
166 .rev_iter(
167 table_key_range,
168 MAX_EPOCH,
169 read_options,
170 read_snapshot,
171 None,
172 )
173 .await
174 }
175}
176
177impl LocalHummockStorage {
178 fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
179 MemTableHummockIterator::new(
180 &self.mem_table.buffer,
181 EpochWithGap::new(self.epoch(), self.spill_offset),
182 self.table_id,
183 )
184 }
185
186 fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
187 MemTableHummockRevIterator::new(
188 &self.mem_table.buffer,
189 EpochWithGap::new(self.epoch(), self.spill_offset),
190 self.table_id,
191 )
192 }
193
194 async fn iter_all(
195 &self,
196 table_key_range: TableKeyRange,
197 epoch: u64,
198 read_options: ReadOptions,
199 ) -> StorageResult<LocalHummockStorageIterator<'_>> {
200 let (table_key_range, read_snapshot) = read_filter_for_version(
201 epoch,
202 read_options.table_id,
203 table_key_range,
204 &self.read_version,
205 )?;
206
207 self.hummock_version_reader
208 .iter_with_memtable(
209 table_key_range,
210 epoch,
211 read_options,
212 read_snapshot,
213 Some(self.mem_table_iter()),
214 )
215 .await
216 }
217
218 async fn rev_iter_all(
219 &self,
220 table_key_range: TableKeyRange,
221 epoch: u64,
222 read_options: ReadOptions,
223 ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
224 let (table_key_range, read_snapshot) = read_filter_for_version(
225 epoch,
226 read_options.table_id,
227 table_key_range,
228 &self.read_version,
229 )?;
230
231 self.hummock_version_reader
232 .rev_iter(
233 table_key_range,
234 epoch,
235 read_options,
236 read_snapshot,
237 Some(self.mem_table_rev_iter()),
238 )
239 .await
240 }
241}
242
243#[derive(Clone)]
244pub struct LocalHummockFlushedSnapshotReader {
245 table_id: TableId,
246 read_version: HummockReadVersionRef,
247 hummock_version_reader: HummockVersionReader,
248}
249
250impl StateStoreRead for LocalHummockFlushedSnapshotReader {
251 type Iter = HummockStorageIterator;
252 type RevIter = HummockStorageRevIterator;
253
254 fn get_keyed_row(
255 &self,
256 key: TableKey<Bytes>,
257 read_options: ReadOptions,
258 ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + Send + '_ {
259 assert_eq!(self.table_id, read_options.table_id);
260 Self::get_flushed(
261 &self.hummock_version_reader,
262 &self.read_version,
263 key,
264 read_options,
265 )
266 }
267
268 fn iter(
269 &self,
270 key_range: TableKeyRange,
271 read_options: ReadOptions,
272 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
273 self.iter_flushed(key_range, read_options)
274 .instrument(tracing::trace_span!("hummock_iter"))
275 }
276
277 fn rev_iter(
278 &self,
279 key_range: TableKeyRange,
280 read_options: ReadOptions,
281 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
282 self.rev_iter_flushed(key_range, read_options)
283 .instrument(tracing::trace_span!("hummock_rev_iter"))
284 }
285}
286
287impl LocalStateStore for LocalHummockStorage {
288 type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
289 type Iter<'a> = LocalHummockStorageIterator<'a>;
290 type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
291
292 async fn get(
293 &self,
294 key: TableKey<Bytes>,
295 read_options: ReadOptions,
296 ) -> StorageResult<Option<Bytes>> {
297 assert_eq!(self.table_id, read_options.table_id);
298 match self.mem_table.buffer.get(&key) {
299 None => LocalHummockFlushedSnapshotReader::get_flushed(
300 &self.hummock_version_reader,
301 &self.read_version,
302 key,
303 read_options,
304 )
305 .await
306 .map(|e| e.map(|item| item.1)),
307 Some(op) => match op {
308 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok(Some(value.clone())),
309 KeyOp::Delete(_) => Ok(None),
310 },
311 }
312 }
313
314 async fn iter(
315 &self,
316 key_range: TableKeyRange,
317 read_options: ReadOptions,
318 ) -> StorageResult<Self::Iter<'_>> {
319 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
320 assert_eq!(
321 r_vnode_exclusive - l_vnode_inclusive,
322 1,
323 "read range {:?} for table {} iter contains more than one vnode",
324 key_range,
325 read_options.table_id
326 );
327 self.iter_all(key_range.clone(), self.epoch(), read_options)
328 .await
329 }
330
331 async fn rev_iter(
332 &self,
333 key_range: TableKeyRange,
334 read_options: ReadOptions,
335 ) -> StorageResult<Self::RevIter<'_>> {
336 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
337 assert_eq!(
338 r_vnode_exclusive - l_vnode_inclusive,
339 1,
340 "read range {:?} for table {} iter contains more than one vnode",
341 key_range,
342 read_options.table_id
343 );
344 self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
345 .await
346 }
347
348 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
349 self.new_flushed_snapshot_reader_inner()
350 }
351
352 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
353 self.read_version.read().latest_watermark(vnode)
354 }
355
356 fn insert(
357 &mut self,
358 key: TableKey<Bytes>,
359 new_val: Bytes,
360 old_val: Option<Bytes>,
361 ) -> StorageResult<()> {
362 match old_val {
363 None => self.mem_table.insert(key, new_val)?,
364 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
365 };
366
367 Ok(())
368 }
369
370 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
371 self.mem_table.delete(key, old_val)?;
372
373 Ok(())
374 }
375
376 async fn flush(&mut self) -> StorageResult<usize> {
377 let buffer = self.mem_table.drain().into_parts();
378 let mut kv_pairs = Vec::with_capacity(buffer.len());
379 let mut old_values = if self.is_flush_old_value() {
380 Some(Vec::with_capacity(buffer.len()))
381 } else {
382 None
383 };
384 let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
385 Some(self.new_flushed_snapshot_reader_inner())
386 } else {
387 None
388 };
389 for (key, key_op) in buffer {
390 match key_op {
391 KeyOp::Insert(value) => {
395 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
396 do_insert_sanity_check(
397 &key,
398 &value,
399 sanity_check_reader,
400 self.table_id,
401 self.table_option,
402 &self.op_consistency_level,
403 )
404 .await?;
405 }
406 kv_pairs.push((key, SharedBufferValue::Insert(value)));
407 if let Some(old_values) = &mut old_values {
408 old_values.push(Bytes::new());
409 }
410 }
411 KeyOp::Delete(old_value) => {
412 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
413 do_delete_sanity_check(
414 &key,
415 &old_value,
416 sanity_check_reader,
417 self.table_id,
418 self.table_option,
419 &self.op_consistency_level,
420 )
421 .await?;
422 }
423 kv_pairs.push((key, SharedBufferValue::Delete));
424 if let Some(old_values) = &mut old_values {
425 old_values.push(old_value);
426 }
427 }
428 KeyOp::Update((old_value, new_value)) => {
429 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
430 do_update_sanity_check(
431 &key,
432 &old_value,
433 &new_value,
434 sanity_check_reader,
435 self.table_id,
436 self.table_option,
437 &self.op_consistency_level,
438 )
439 .await?;
440 }
441 kv_pairs.push((key, SharedBufferValue::Update(new_value)));
442 if let Some(old_values) = &mut old_values {
443 old_values.push(old_value);
444 }
445 }
446 }
447 }
448 self.flush_inner(
449 kv_pairs,
450 old_values,
451 WriteOptions {
452 epoch: self.epoch(),
453 table_id: self.table_id,
454 },
455 )
456 .await
457 }
458
459 async fn try_flush(&mut self) -> StorageResult<()> {
460 if self.mem_table_spill_threshold != 0
461 && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
462 {
463 if self.spill_offset < MAX_SPILL_TIMES {
464 let table_id_label = self.table_id.table_id().to_string();
465 self.flush().await?;
466 self.stats
467 .mem_table_spill_counts
468 .with_label_values(&[table_id_label.as_str()])
469 .inc();
470 } else {
471 tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
472 }
473 }
474
475 Ok(())
476 }
477
478 fn epoch(&self) -> u64 {
479 self.epoch.expect("should have set the epoch").curr
480 }
481
482 fn is_dirty(&self) -> bool {
483 self.mem_table.is_dirty()
484 }
485
486 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
487 let epoch = options.epoch;
488 wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
489 assert_eq!(
490 self.epoch.replace(epoch),
491 None,
492 "local state store of table id {:?} is init for more than once",
493 self.table_id
494 );
495 if !self.is_replicated {
496 self.event_sender
497 .send(HummockEvent::InitEpoch {
498 instance_id: self.instance_id(),
499 init_epoch: options.epoch.curr,
500 })
501 .map_err(|_| {
502 HummockError::other("failed to send InitEpoch. maybe shutting down")
503 })?;
504 }
505 Ok(())
506 }
507
508 fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
509 assert!(!self.is_dirty());
510 if let Some(new_level) = &opts.switch_op_consistency_level {
511 self.mem_table.op_consistency_level.update(new_level);
512 self.op_consistency_level.update(new_level);
513 }
514 let epoch = self
515 .epoch
516 .as_mut()
517 .expect("should have init epoch before seal the first epoch");
518 let prev_epoch = epoch.curr;
519 epoch.prev = prev_epoch;
520 epoch.curr = next_epoch;
521 self.spill_offset = 0;
522 assert!(
523 next_epoch > prev_epoch,
524 "new epoch {} should be greater than current epoch: {}",
525 next_epoch,
526 prev_epoch
527 );
528
529 if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
531 &mut opts.table_watermarks
532 {
533 let mut read_version = self.read_version.write();
534 read_version.filter_regress_watermarks(watermarks);
535 if !watermarks.is_empty() {
536 read_version.update(VersionUpdate::NewTableWatermark {
537 direction: *direction,
538 epoch: prev_epoch,
539 vnode_watermarks: watermarks.clone(),
540 watermark_type: WatermarkSerdeType::PkPrefix,
541 });
542 }
543 }
544
545 if !self.is_replicated
546 && self
547 .event_sender
548 .send(HummockEvent::LocalSealEpoch {
549 instance_id: self.instance_id(),
550 next_epoch,
551 opts,
552 })
553 .is_err()
554 {
555 warn!("failed to send LocalSealEpoch. maybe shutting down");
556 }
557 }
558
559 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
560 wait_for_epoch(
561 &self.version_update_notifier_tx,
562 self.epoch.expect("should have init").prev,
563 self.table_id,
564 )
565 .await?;
566 assert!(self.mem_table.buffer.is_empty());
567 let mut read_version = self.read_version.write();
568 assert!(
569 read_version.staging().is_empty(),
570 "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
571 self.table_id(),
572 self.instance_id()
573 );
574 Ok(read_version.update_vnode_bitmap(vnodes))
575 }
576}
577
578impl LocalHummockStorage {
579 fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
580 LocalHummockFlushedSnapshotReader {
581 table_id: self.table_id,
582 read_version: self.read_version.clone(),
583 hummock_version_reader: self.hummock_version_reader.clone(),
584 }
585 }
586
587 async fn flush_inner(
588 &mut self,
589 sorted_items: Vec<SharedBufferItem>,
590 old_values: Option<Vec<Bytes>>,
591 write_options: WriteOptions,
592 ) -> StorageResult<usize> {
593 let epoch = write_options.epoch;
594 let table_id = write_options.table_id;
595
596 let table_id_label = table_id.to_string();
597 self.stats
598 .write_batch_tuple_counts
599 .with_label_values(&[table_id_label.as_str()])
600 .inc_by(sorted_items.len() as _);
601 let timer = self
602 .stats
603 .write_batch_duration
604 .with_label_values(&[table_id_label.as_str()])
605 .start_timer();
606
607 let imm_size = if !sorted_items.is_empty() {
608 let (size, old_value_size) =
609 SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
610
611 self.write_limiter.wait_permission(self.table_id).await;
612 let limiter = self.memory_limiter.as_ref();
613 let tracker = match limiter.try_require_memory(size as u64) {
614 Some(tracker) => tracker,
615 _ => {
616 warn!(
617 "blocked at requiring memory: {}, current {}",
618 size,
619 limiter.get_memory_usage()
620 );
621 self.event_sender
622 .send(HummockEvent::BufferMayFlush)
623 .expect("should be able to send");
624 let tracker = limiter
625 .require_memory(size as u64)
626 .instrument_await("hummock_require_memory".verbose())
627 .await;
628 warn!(
629 "successfully requiring memory: {}, current {}",
630 size,
631 limiter.get_memory_usage()
632 );
633 tracker
634 }
635 };
636
637 let old_values = old_values.map(|old_values| {
638 SharedBufferBatchOldValues::new(
639 old_values,
640 old_value_size,
641 self.stats.old_value_size.clone(),
642 )
643 });
644
645 let instance_id = self.instance_guard.instance_id;
646 let imm = SharedBufferBatch::build_shared_buffer_batch(
647 epoch,
648 self.spill_offset,
649 sorted_items,
650 old_values,
651 size,
652 table_id,
653 Some(tracker),
654 );
655 self.spill_offset += 1;
656 let imm_size = imm.size();
657 self.read_version
658 .write()
659 .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
660
661 if !self.is_replicated {
663 self.event_sender
664 .send(HummockEvent::ImmToUploader { instance_id, imm })
665 .map_err(|_| {
666 HummockError::other("failed to send imm to uploader. maybe shutting down")
667 })?;
668 }
669 imm_size
670 } else {
671 0
672 };
673
674 timer.observe_duration();
675
676 self.stats
677 .write_batch_size
678 .with_label_values(&[table_id_label.as_str()])
679 .observe(imm_size as _);
680 Ok(imm_size)
681 }
682}
683
684impl LocalHummockStorage {
685 #[allow(clippy::too_many_arguments)]
686 pub fn new(
687 instance_guard: LocalInstanceGuard,
688 read_version: HummockReadVersionRef,
689 hummock_version_reader: HummockVersionReader,
690 event_sender: HummockEventSender,
691 memory_limiter: Arc<MemoryLimiter>,
692 write_limiter: WriteLimiterRef,
693 option: NewLocalOptions,
694 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
695 mem_table_spill_threshold: usize,
696 ) -> Self {
697 let stats = hummock_version_reader.stats().clone();
698 Self {
699 mem_table: MemTable::new(option.op_consistency_level.clone()),
700 spill_offset: 0,
701 epoch: None,
702 table_id: option.table_id,
703 op_consistency_level: option.op_consistency_level,
704 table_option: option.table_option,
705 is_replicated: option.is_replicated,
706 instance_guard,
707 read_version,
708 event_sender,
709 memory_limiter,
710 hummock_version_reader,
711 stats,
712 write_limiter,
713 version_update_notifier_tx,
714 mem_table_spill_threshold,
715 }
716 }
717
718 pub fn read_version(&self) -> HummockReadVersionRef {
720 self.read_version.clone()
721 }
722
723 pub fn table_id(&self) -> TableId {
724 self.instance_guard.table_id
725 }
726
727 pub fn instance_id(&self) -> u64 {
728 self.instance_guard.instance_id
729 }
730
731 fn is_flush_old_value(&self) -> bool {
732 matches!(
733 &self.op_consistency_level,
734 OpConsistencyLevel::ConsistentOldValue {
735 is_log_store: true,
736 ..
737 }
738 )
739 }
740}
741
742pub type StagingDataIterator = MergeIterator<
743 HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
744>;
745pub type StagingDataRevIterator = MergeIterator<
746 HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
747>;
748pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
749 HummockIteratorUnion<
750 Forward,
751 StagingDataIterator,
752 SstableIterator,
753 ConcatIteratorInner<SstableIterator>,
754 MemTableHummockIterator<'a>,
755 >,
756>;
757
758pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
759 HummockIteratorUnion<
760 Backward,
761 StagingDataRevIterator,
762 BackwardSstableIterator,
763 ConcatIteratorInner<BackwardSstableIterator>,
764 MemTableHummockRevIterator<'a>,
765 >,
766>;
767
768pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
769pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
770pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
771pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
772
773pub struct HummockStorageIteratorInner<'a> {
774 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
775 initial_read: bool,
776 stats_guard: IterLocalMetricsGuard,
777}
778
779impl StateStoreIter for HummockStorageIteratorInner<'_> {
780 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
781 let iter = &mut self.inner;
782 if !self.initial_read {
783 self.initial_read = true;
784 } else {
785 iter.next().await?;
786 }
787
788 if iter.is_valid() {
789 Ok(Some((iter.key(), iter.value())))
790 } else {
791 Ok(None)
792 }
793 }
794}
795
796impl<'a> HummockStorageIteratorInner<'a> {
797 pub fn new(
798 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
799 metrics: Arc<HummockStateStoreMetrics>,
800 table_id: TableId,
801 mut local_stats: StoreLocalStatistic,
802 ) -> Self {
803 local_stats.found_key = inner.is_valid();
804 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
805 + local_stats.staging_sst_iter_count
806 + local_stats.overlapping_iter_count
807 + local_stats.non_overlapping_iter_count;
808 Self {
809 inner,
810 initial_read: false,
811 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
812 }
813 }
814}
815
816impl Drop for HummockStorageIteratorInner<'_> {
817 fn drop(&mut self) {
818 self.inner
819 .collect_local_statistic(&mut self.stats_guard.local_stats);
820 }
821}
822
823#[derive(Default)]
824pub struct ForwardIteratorFactory {
825 non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
826 overlapping_iters: Vec<SstableIterator>,
827 staging_iters:
828 Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
829}
830
831impl ForwardIteratorFactory {
832 pub fn build(
833 self,
834 mem_table: Option<MemTableHummockIterator<'_>>,
835 ) -> HummockStorageIteratorPayloadInner<'_> {
836 let staging_iter = StagingDataIterator::new(self.staging_iters);
838 MergeIterator::new(
839 once(HummockIteratorUnion::First(staging_iter))
840 .chain(
841 self.overlapping_iters
842 .into_iter()
843 .map(HummockIteratorUnion::Second),
844 )
845 .chain(
846 self.non_overlapping_iters
847 .into_iter()
848 .map(HummockIteratorUnion::Third),
849 )
850 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
851 )
852 }
853}
854
855pub struct HummockStorageRevIteratorInner<'a> {
856 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
857 initial_read: bool,
858 stats_guard: IterLocalMetricsGuard,
859}
860
861impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
862 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
863 let iter = &mut self.inner;
864 if !self.initial_read {
865 self.initial_read = true;
866 } else {
867 iter.next().await?;
868 }
869
870 if iter.is_valid() {
871 Ok(Some((iter.key(), iter.value())))
872 } else {
873 Ok(None)
874 }
875 }
876}
877
878impl<'a> HummockStorageRevIteratorInner<'a> {
879 pub fn new(
880 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
881 metrics: Arc<HummockStateStoreMetrics>,
882 table_id: TableId,
883 mut local_stats: StoreLocalStatistic,
884 ) -> Self {
885 local_stats.found_key = inner.is_valid();
886 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
887 + local_stats.staging_sst_iter_count
888 + local_stats.overlapping_iter_count
889 + local_stats.non_overlapping_iter_count;
890 Self {
891 inner,
892 initial_read: false,
893 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
894 }
895 }
896}
897
898impl Drop for HummockStorageRevIteratorInner<'_> {
899 fn drop(&mut self) {
900 self.inner
901 .collect_local_statistic(&mut self.stats_guard.local_stats);
902 }
903}
904
905impl IteratorFactory for ForwardIteratorFactory {
906 type Direction = Forward;
907 type SstableIteratorType = SstableIterator;
908
909 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
910 self.staging_iters
911 .push(HummockIteratorUnion::First(batch.into_forward_iter()));
912 }
913
914 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
915 self.staging_iters.push(HummockIteratorUnion::Second(iter));
916 }
917
918 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
919 self.overlapping_iters.push(iter);
920 }
921
922 fn add_concat_sst_iter(
923 &mut self,
924 tables: Vec<SstableInfo>,
925 sstable_store: SstableStoreRef,
926 read_options: Arc<SstableIteratorReadOptions>,
927 ) {
928 self.non_overlapping_iters
929 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
930 tables,
931 sstable_store,
932 read_options,
933 ));
934 }
935}
936
937#[derive(Default)]
938pub struct BackwardIteratorFactory {
939 non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
940 overlapping_iters: Vec<BackwardSstableIterator>,
941 staging_iters: Vec<
942 HummockIteratorUnion<
943 Backward,
944 SharedBufferBatchIterator<Backward>,
945 BackwardSstableIterator,
946 >,
947 >,
948}
949
950impl BackwardIteratorFactory {
951 pub fn build(
952 self,
953 mem_table: Option<MemTableHummockRevIterator<'_>>,
954 ) -> StorageRevIteratorPayloadInner<'_> {
955 let staging_iter = StagingDataRevIterator::new(self.staging_iters);
957 MergeIterator::new(
958 once(HummockIteratorUnion::First(staging_iter))
959 .chain(
960 self.overlapping_iters
961 .into_iter()
962 .map(HummockIteratorUnion::Second),
963 )
964 .chain(
965 self.non_overlapping_iters
966 .into_iter()
967 .map(HummockIteratorUnion::Third),
968 )
969 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
970 )
971 }
972}
973
974impl IteratorFactory for BackwardIteratorFactory {
975 type Direction = Backward;
976 type SstableIteratorType = BackwardSstableIterator;
977
978 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
979 self.staging_iters
980 .push(HummockIteratorUnion::First(batch.into_backward_iter()));
981 }
982
983 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
984 self.staging_iters.push(HummockIteratorUnion::Second(iter));
985 }
986
987 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
988 self.overlapping_iters.push(iter);
989 }
990
991 fn add_concat_sst_iter(
992 &mut self,
993 mut tables: Vec<SstableInfo>,
994 sstable_store: SstableStoreRef,
995 read_options: Arc<SstableIteratorReadOptions>,
996 ) {
997 tables.reverse();
998 self.non_overlapping_iters
999 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1000 tables,
1001 sstable_store,
1002 read_options,
1003 ));
1004 }
1005}