1use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::key::{
27 FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
31use tracing::{Instrument, warn};
32
33use super::version::VersionUpdate;
34use crate::error::StorageResult;
35use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
36use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
37use crate::hummock::iterator::{
38 Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
39 IteratorFactory, MergeIterator, UserIterator,
40};
41use crate::hummock::local_version::pinned_version::PinnedVersion;
42use crate::hummock::shared_buffer::TableMemoryMetrics;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44 SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45 SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50 wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54 BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55 SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61pub struct LocalHummockStorage {
64 mem_table: MemTable,
65
66 spill_offset: u16,
67 epoch: Option<EpochPair>,
68
69 table_id: TableId,
70 op_consistency_level: OpConsistencyLevel,
71 table_option: TableOption,
72
73 instance_guard: LocalInstanceGuard,
74
75 read_version: HummockReadVersionRef,
77
78 is_replicated: bool,
89
90 upload_on_flush: bool,
92
93 event_sender: HummockEventSender,
95
96 memory_limiter: Arc<MemoryLimiter>,
97
98 hummock_version_reader: HummockVersionReader,
99
100 table_memory_metrics: Arc<TableMemoryMetrics>,
101
102 write_limiter: WriteLimiterRef,
103
104 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106 mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110 async fn get_flushed<'a, O>(
111 hummock_version_reader: &'a HummockVersionReader,
112 read_version: &HummockReadVersionRef,
113 user_key: UserKey<Bytes>,
114 table_option: TableOption,
115 read_options: ReadOptions,
116 on_key_value_fn: impl KeyValueFn<'a, O>,
117 epoch: HummockEpoch,
118 ) -> StorageResult<Option<O>> {
119 let table_key_range = (
120 Bound::Included(user_key.table_key.clone()),
121 Bound::Included(user_key.table_key.clone()),
122 );
123
124 let (table_key_range, read_snapshot) =
125 read_filter_for_version(epoch, user_key.table_id, table_key_range, read_version)?;
126
127 if is_empty_key_range(&table_key_range) {
128 return Ok(None);
129 }
130
131 hummock_version_reader
132 .get(
133 user_key.table_key,
134 epoch,
135 user_key.table_id,
136 table_option,
137 read_options,
138 read_snapshot,
139 on_key_value_fn,
140 )
141 .await
142 }
143
144 async fn iter_flushed(
145 &self,
146 table_key_range: TableKeyRange,
147 read_options: ReadOptions,
148 epoch: HummockEpoch,
149 ) -> StorageResult<HummockStorageIterator> {
150 let (table_key_range, read_snapshot) =
151 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
152
153 let table_key_range = table_key_range;
154
155 self.hummock_version_reader
156 .iter(
157 table_key_range,
158 epoch,
159 self.table_id,
160 self.table_option,
161 read_options,
162 read_snapshot,
163 )
164 .await
165 }
166
167 async fn rev_iter_flushed(
168 &self,
169 table_key_range: TableKeyRange,
170 read_options: ReadOptions,
171 epoch: HummockEpoch,
172 ) -> StorageResult<HummockStorageRevIterator> {
173 let (table_key_range, read_snapshot) =
174 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
175
176 let table_key_range = table_key_range;
177
178 self.hummock_version_reader
179 .rev_iter(
180 table_key_range,
181 epoch,
182 self.table_id,
183 self.table_option,
184 read_options,
185 read_snapshot,
186 None,
187 )
188 .await
189 }
190}
191
192impl LocalHummockStorage {
193 fn epoch(&self) -> u64 {
194 self.epoch.expect("should have set the epoch").curr
195 }
196
197 fn current_epoch_with_gap(&self) -> EpochWithGap {
198 EpochWithGap::new(self.epoch(), self.spill_offset)
199 }
200
201 fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
202 MemTableHummockIterator::new(
203 &self.mem_table.buffer,
204 self.current_epoch_with_gap(),
205 self.table_id,
206 )
207 }
208
209 fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
210 MemTableHummockRevIterator::new(
211 &self.mem_table.buffer,
212 self.current_epoch_with_gap(),
213 self.table_id,
214 )
215 }
216
217 async fn iter_all(
218 &self,
219 table_key_range: TableKeyRange,
220 epoch: u64,
221 read_options: ReadOptions,
222 ) -> StorageResult<LocalHummockStorageIterator<'_>> {
223 let (table_key_range, read_snapshot) =
224 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
225
226 self.hummock_version_reader
227 .iter_with_memtable(
228 table_key_range,
229 epoch,
230 self.table_id,
231 self.table_option,
232 read_options,
233 read_snapshot,
234 Some(self.mem_table_iter()),
235 )
236 .await
237 }
238
239 async fn rev_iter_all(
240 &self,
241 table_key_range: TableKeyRange,
242 epoch: u64,
243 read_options: ReadOptions,
244 ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245 let (table_key_range, read_snapshot) =
246 read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248 self.hummock_version_reader
249 .rev_iter(
250 table_key_range,
251 epoch,
252 self.table_id,
253 self.table_option,
254 read_options,
255 read_snapshot,
256 Some(self.mem_table_rev_iter()),
257 )
258 .await
259 }
260}
261
262#[derive(Clone)]
263pub struct LocalHummockFlushedSnapshotReader {
264 table_id: TableId,
265 table_option: TableOption,
266 read_version: HummockReadVersionRef,
267 hummock_version_reader: HummockVersionReader,
268 epoch: HummockEpoch,
269}
270
271impl StateStoreGet for LocalHummockFlushedSnapshotReader {
272 async fn on_key_value<'a, O: Send + 'a>(
273 &'a self,
274 key: TableKey<Bytes>,
275 read_options: ReadOptions,
276 on_key_value_fn: impl KeyValueFn<'a, O>,
277 ) -> StorageResult<Option<O>> {
278 let key = UserKey::new(self.table_id, key);
279 Self::get_flushed(
280 &self.hummock_version_reader,
281 &self.read_version,
282 key,
283 self.table_option,
284 read_options,
285 on_key_value_fn,
286 self.epoch,
287 )
288 .await
289 }
290}
291
292impl StateStoreRead for LocalHummockFlushedSnapshotReader {
293 type Iter = HummockStorageIterator;
294 type RevIter = HummockStorageRevIterator;
295
296 fn iter(
297 &self,
298 key_range: TableKeyRange,
299 read_options: ReadOptions,
300 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
301 self.iter_flushed(key_range, read_options, self.epoch)
302 .instrument(tracing::trace_span!("hummock_iter"))
303 }
304
305 fn rev_iter(
306 &self,
307 key_range: TableKeyRange,
308 read_options: ReadOptions,
309 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
310 self.rev_iter_flushed(key_range, read_options, self.epoch)
311 .instrument(tracing::trace_span!("hummock_rev_iter"))
312 }
313}
314
315impl StateStoreGet for LocalHummockStorage {
316 async fn on_key_value<'a, O: Send + 'a>(
317 &'a self,
318 key: TableKey<Bytes>,
319 read_options: ReadOptions,
320 on_key_value_fn: impl KeyValueFn<'a, O>,
321 ) -> StorageResult<Option<O>> {
322 let key = UserKey::new(self.table_id, key);
323 match self.mem_table.buffer.get(&key.table_key) {
324 None => {
325 LocalHummockFlushedSnapshotReader::get_flushed(
326 &self.hummock_version_reader,
327 &self.read_version,
328 key,
329 self.table_option,
330 read_options,
331 on_key_value_fn,
332 self.epoch(),
333 )
334 .await
335 }
336 Some(op) => match op {
337 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
338 Some(on_key_value_fn(
339 FullKey::new_with_gap_epoch(
340 self.table_id,
341 key.table_key.to_ref(),
342 self.current_epoch_with_gap(),
343 ),
344 value.as_ref(),
345 )?)
346 }),
347 KeyOp::Delete(_) => Ok(None),
348 },
349 }
350 }
351}
352
353impl LocalStateStore for LocalHummockStorage {
354 type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
355 type Iter<'a> = LocalHummockStorageIterator<'a>;
356 type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
357
358 async fn iter(
359 &self,
360 key_range: TableKeyRange,
361 read_options: ReadOptions,
362 ) -> StorageResult<Self::Iter<'_>> {
363 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
364 assert_eq!(
365 r_vnode_exclusive - l_vnode_inclusive,
366 1,
367 "read range {:?} for table {} iter contains more than one vnode",
368 key_range,
369 self.table_id
370 );
371 self.iter_all(key_range.clone(), self.epoch(), read_options)
372 .await
373 }
374
375 async fn rev_iter(
376 &self,
377 key_range: TableKeyRange,
378 read_options: ReadOptions,
379 ) -> StorageResult<Self::RevIter<'_>> {
380 let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
381 assert_eq!(
382 r_vnode_exclusive - l_vnode_inclusive,
383 1,
384 "read range {:?} for table {} iter contains more than one vnode",
385 key_range,
386 self.table_id
387 );
388 self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
389 .await
390 }
391
392 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
393 assert_eq!(
394 self.table_option.retention_seconds, None,
395 "flushed snapshot reader should not work with table {} with ttl",
396 self.table_id
397 );
398 self.new_flushed_snapshot_reader_inner(MAX_EPOCH)
399 }
400
401 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
402 self.read_version.read().latest_watermark(vnode)
403 }
404
405 fn insert(
406 &mut self,
407 key: TableKey<Bytes>,
408 new_val: Bytes,
409 old_val: Option<Bytes>,
410 ) -> StorageResult<()> {
411 match old_val {
412 None => self.mem_table.insert(key, new_val)?,
413 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
414 };
415
416 Ok(())
417 }
418
419 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
420 self.mem_table.delete(key, old_val)?;
421
422 Ok(())
423 }
424
425 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
426 self.update_vnode_bitmap_impl(vnodes).await
427 }
428}
429
430impl StateStoreWriteEpochControl for LocalHummockStorage {
431 async fn flush(&mut self) -> StorageResult<usize> {
432 let buffer = self.mem_table.drain().into_parts();
433 let mut kv_pairs = Vec::with_capacity(buffer.len());
434 let mut old_values = if self.is_flush_old_value() {
435 Some(Vec::with_capacity(buffer.len()))
436 } else {
437 None
438 };
439 let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
440 Some(self.new_flushed_snapshot_reader_inner(self.epoch()))
441 } else {
442 None
443 };
444 for (key, key_op) in buffer {
445 match key_op {
446 KeyOp::Insert(value) => {
450 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
451 do_insert_sanity_check(
452 self.table_id,
453 &key,
454 &value,
455 sanity_check_reader,
456 &self.op_consistency_level,
457 )
458 .await?;
459 }
460 kv_pairs.push((key, SharedBufferValue::Insert(value)));
461 if let Some(old_values) = &mut old_values {
462 old_values.push(Bytes::new());
463 }
464 }
465 KeyOp::Delete(old_value) => {
466 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
467 do_delete_sanity_check(
468 self.table_id,
469 &key,
470 &old_value,
471 sanity_check_reader,
472 &self.op_consistency_level,
473 )
474 .await?;
475 }
476 kv_pairs.push((key, SharedBufferValue::Delete));
477 if let Some(old_values) = &mut old_values {
478 old_values.push(old_value);
479 }
480 }
481 KeyOp::Update((old_value, new_value)) => {
482 if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
483 do_update_sanity_check(
484 self.table_id,
485 &key,
486 &old_value,
487 &new_value,
488 sanity_check_reader,
489 &self.op_consistency_level,
490 )
491 .await?;
492 }
493 kv_pairs.push((key, SharedBufferValue::Update(new_value)));
494 if let Some(old_values) = &mut old_values {
495 old_values.push(old_value);
496 }
497 }
498 }
499 }
500 self.flush_inner(kv_pairs, old_values).await
501 }
502
503 async fn try_flush(&mut self) -> StorageResult<()> {
504 if self.mem_table_spill_threshold != 0
505 && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
506 {
507 if self.spill_offset < MAX_SPILL_TIMES {
508 self.flush().await?;
509 self.table_memory_metrics.mem_table_spill_counts.inc();
510 } else {
511 tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
512 }
513 }
514
515 Ok(())
516 }
517
518 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
519 let epoch = options.epoch;
520 wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
521 assert_eq!(
522 self.epoch.replace(epoch),
523 None,
524 "local state store of table id {:?} is init for more than once",
525 self.table_id
526 );
527 self.read_version.write().init();
528 if !self.is_replicated {
529 self.event_sender
530 .send(HummockEvent::InitEpoch {
531 instance_id: self.instance_id(),
532 init_epoch: options.epoch.curr,
533 })
534 .map_err(|_| {
535 HummockError::other("failed to send InitEpoch. maybe shutting down")
536 })?;
537 }
538 Ok(())
539 }
540
541 fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
542 assert!(!self.mem_table.is_dirty());
543 if !self.is_replicated {
544 if self.upload_on_flush {
545 debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
546 } else {
547 let pending_imms = self.read_version.write().start_upload_pending_imms();
548 if !pending_imms.is_empty()
549 && self
550 .event_sender
551 .send(HummockEvent::ImmToUploader {
552 instance_id: self.instance_id(),
553 imms: pending_imms,
554 })
555 .is_err()
556 {
557 warn!("failed to send ImmToUploader during seal. maybe shutting down");
558 }
559 }
560 }
561
562 if let Some(new_level) = &opts.switch_op_consistency_level {
563 self.mem_table.op_consistency_level.update(new_level);
564 self.op_consistency_level.update(new_level);
565 }
566 let epoch = self
567 .epoch
568 .as_mut()
569 .expect("should have init epoch before seal the first epoch");
570 let prev_epoch = epoch.curr;
571 epoch.prev = prev_epoch;
572 epoch.curr = next_epoch;
573 self.spill_offset = 0;
574 assert!(
575 next_epoch > prev_epoch,
576 "new epoch {} should be greater than current epoch: {}",
577 next_epoch,
578 prev_epoch
579 );
580
581 if let Some((direction, watermarks, watermark_type)) = &mut opts.table_watermarks {
582 let mut read_version = self.read_version.write();
583 read_version.filter_regress_watermarks(watermarks);
584 if !watermarks.is_empty() {
585 read_version.update(VersionUpdate::NewTableWatermark {
586 direction: *direction,
587 epoch: prev_epoch,
588 vnode_watermarks: watermarks.clone(),
589 watermark_type: *watermark_type,
590 });
591 }
592 }
593
594 if !self.is_replicated
595 && self
596 .event_sender
597 .send(HummockEvent::LocalSealEpoch {
598 instance_id: self.instance_id(),
599 next_epoch,
600 opts,
601 })
602 .is_err()
603 {
604 warn!("failed to send LocalSealEpoch. maybe shutting down");
605 }
606 }
607}
608
609impl LocalHummockStorage {
610 async fn update_vnode_bitmap_impl(
611 &mut self,
612 vnodes: Arc<Bitmap>,
613 ) -> StorageResult<Arc<Bitmap>> {
614 wait_for_epoch(
615 &self.version_update_notifier_tx,
616 self.epoch.expect("should have init").prev,
617 self.table_id,
618 )
619 .await?;
620 assert!(!self.mem_table.is_dirty());
621 let mut read_version = self.read_version.write();
622 assert!(
623 read_version.staging().is_empty(),
624 "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
625 self.table_id(),
626 self.instance_id()
627 );
628 Ok(read_version.update_vnode_bitmap(vnodes))
629 }
630
631 fn new_flushed_snapshot_reader_inner(
632 &self,
633 epoch: HummockEpoch,
634 ) -> LocalHummockFlushedSnapshotReader {
635 LocalHummockFlushedSnapshotReader {
636 table_id: self.table_id,
637 table_option: self.table_option,
638 read_version: self.read_version.clone(),
639 hummock_version_reader: self.hummock_version_reader.clone(),
640 epoch,
641 }
642 }
643
644 async fn flush_inner(
645 &mut self,
646 sorted_items: Vec<SharedBufferItem>,
647 old_values: Option<Vec<Bytes>>,
648 ) -> StorageResult<usize> {
649 let epoch = self.epoch();
650 let table_id = self.table_id;
651
652 self.table_memory_metrics
653 .write_batch_tuple_counts
654 .inc_by(sorted_items.len() as _);
655 let timer = self.table_memory_metrics.write_batch_duration.start_timer();
656
657 let imm_size = if !sorted_items.is_empty() {
658 let (size, old_value_size) =
659 SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
660
661 let old_values = old_values.map(|old_values| {
662 SharedBufferBatchOldValues::new(
663 old_values,
664 old_value_size,
665 self.table_memory_metrics.old_value_size.clone(),
666 )
667 });
668
669 let instance_id = self.instance_guard.instance_id;
670 let imm = SharedBufferBatch::build_shared_buffer_batch(
671 epoch,
672 self.spill_offset,
673 sorted_items,
674 old_values,
675 size,
676 table_id,
677 self.table_memory_metrics.clone(),
678 );
679 self.spill_offset += 1;
680
681 if self.is_replicated {
682 self.read_version.write().add_replicated_imm(imm);
683 } else {
684 self.write_limiter.wait_permission(self.table_id).await;
685 let limiter = &self.memory_limiter;
686 let (tracker, fast_required_memory) = match limiter.try_require_memory(size as u64)
687 {
688 Some(tracker) => (tracker, true),
689 None => {
690 warn!(
691 "blocked at requiring memory: {}, current {}",
692 size,
693 limiter.get_memory_usage()
694 );
695 self.event_sender
696 .send(HummockEvent::BufferMayFlush)
697 .expect("should be able to send");
698 let tracker = limiter
699 .require_memory(size as u64)
700 .instrument_await("hummock_require_memory".verbose())
701 .await;
702 warn!(
703 "successfully requiring memory: {}, current {}",
704 size,
705 limiter.get_memory_usage()
706 );
707 (tracker, false)
708 }
709 };
710 let mut read_version = self.read_version.write();
711 read_version.add_pending_imm(imm, tracker);
712 if self.upload_on_flush
713 || read_version.pending_imm_size() >= self.mem_table_spill_threshold
714 || !fast_required_memory
715 {
716 let imms = read_version.start_upload_pending_imms();
717 self.event_sender
718 .send(HummockEvent::ImmToUploader { instance_id, imms })
719 .map_err(|_| {
720 HummockError::other(
721 "failed to send imm to uploader. maybe shutting down",
722 )
723 })?;
724 }
725 }
726
727 size
728 } else {
729 0
730 };
731
732 timer.observe_duration();
733
734 self.table_memory_metrics
735 .write_batch_size
736 .observe(imm_size as _);
737 Ok(imm_size)
738 }
739}
740
741impl LocalHummockStorage {
742 pub fn new(
743 instance_guard: LocalInstanceGuard,
744 read_version: HummockReadVersionRef,
745 hummock_version_reader: HummockVersionReader,
746 event_sender: HummockEventSender,
747 memory_limiter: Arc<MemoryLimiter>,
748 write_limiter: WriteLimiterRef,
749 option: NewLocalOptions,
750 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
751 mem_table_spill_threshold: usize,
752 ) -> Self {
753 let table_memory_metrics = Arc::new(TableMemoryMetrics::new(
754 hummock_version_reader.stats(),
755 option.table_id,
756 option.is_replicated,
757 ));
758 Self {
759 mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
760 spill_offset: 0,
761 epoch: None,
762 table_id: option.table_id,
763 op_consistency_level: option.op_consistency_level,
764 table_option: option.table_option,
765 is_replicated: option.is_replicated,
766 instance_guard,
767 read_version,
768 event_sender,
769 memory_limiter,
770 hummock_version_reader,
771 table_memory_metrics,
772 write_limiter,
773 version_update_notifier_tx,
774 mem_table_spill_threshold,
775 upload_on_flush: option.upload_on_flush,
776 }
777 }
778
779 pub fn read_version(&self) -> HummockReadVersionRef {
781 self.read_version.clone()
782 }
783
784 pub fn table_id(&self) -> TableId {
785 self.instance_guard.table_id
786 }
787
788 pub fn instance_id(&self) -> u64 {
789 self.instance_guard.instance_id
790 }
791
792 fn is_flush_old_value(&self) -> bool {
793 matches!(
794 &self.op_consistency_level,
795 OpConsistencyLevel::ConsistentOldValue {
796 is_log_store: true,
797 ..
798 }
799 )
800 }
801}
802
803pub type StagingDataIterator = MergeIterator<
804 HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
805>;
806pub type StagingDataRevIterator = MergeIterator<
807 HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
808>;
809pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
810 HummockIteratorUnion<
811 Forward,
812 StagingDataIterator,
813 SstableIterator,
814 ConcatIteratorInner<SstableIterator>,
815 MemTableHummockIterator<'a>,
816 >,
817>;
818
819pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
820 HummockIteratorUnion<
821 Backward,
822 StagingDataRevIterator,
823 BackwardSstableIterator,
824 ConcatIteratorInner<BackwardSstableIterator>,
825 MemTableHummockRevIterator<'a>,
826 >,
827>;
828
829pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
830pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
831pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
832pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
833
834pub struct HummockStorageIteratorInner<'a> {
835 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
836 initial_read: bool,
837 stats_guard: IterLocalMetricsGuard,
838}
839
840impl StateStoreIter for HummockStorageIteratorInner<'_> {
841 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
842 let iter = &mut self.inner;
843 if !self.initial_read {
844 self.initial_read = true;
845 } else {
846 iter.next().await?;
847 }
848
849 if iter.is_valid() {
850 Ok(Some((iter.key(), iter.value())))
851 } else {
852 Ok(None)
853 }
854 }
855}
856
857impl<'a> HummockStorageIteratorInner<'a> {
858 pub fn new(
859 inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
860 metrics: Arc<HummockStateStoreMetrics>,
861 table_id: TableId,
862 mut local_stats: StoreLocalStatistic,
863 ) -> Self {
864 local_stats.found_key = inner.is_valid();
865 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
866 + local_stats.staging_sst_iter_count
867 + local_stats.overlapping_iter_count
868 + local_stats.non_overlapping_iter_count;
869 Self {
870 inner,
871 initial_read: false,
872 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
873 }
874 }
875}
876
877impl Drop for HummockStorageIteratorInner<'_> {
878 fn drop(&mut self) {
879 self.inner
880 .collect_local_statistic(&mut self.stats_guard.local_stats);
881 }
882}
883
884#[derive(Default)]
885pub struct ForwardIteratorFactory {
886 non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
887 overlapping_iters: Vec<SstableIterator>,
888 staging_iters:
889 Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
890}
891
892impl ForwardIteratorFactory {
893 pub fn build(
894 self,
895 mem_table: Option<MemTableHummockIterator<'_>>,
896 ) -> HummockStorageIteratorPayloadInner<'_> {
897 let staging_iter = StagingDataIterator::new(self.staging_iters);
899 MergeIterator::new(
900 once(HummockIteratorUnion::First(staging_iter))
901 .chain(
902 self.overlapping_iters
903 .into_iter()
904 .map(HummockIteratorUnion::Second),
905 )
906 .chain(
907 self.non_overlapping_iters
908 .into_iter()
909 .map(HummockIteratorUnion::Third),
910 )
911 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
912 )
913 }
914}
915
916pub struct HummockStorageRevIteratorInner<'a> {
917 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
918 initial_read: bool,
919 stats_guard: IterLocalMetricsGuard,
920}
921
922impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
923 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
924 let iter = &mut self.inner;
925 if !self.initial_read {
926 self.initial_read = true;
927 } else {
928 iter.next().await?;
929 }
930
931 if iter.is_valid() {
932 Ok(Some((iter.key(), iter.value())))
933 } else {
934 Ok(None)
935 }
936 }
937}
938
939impl<'a> HummockStorageRevIteratorInner<'a> {
940 pub fn new(
941 inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
942 metrics: Arc<HummockStateStoreMetrics>,
943 table_id: TableId,
944 mut local_stats: StoreLocalStatistic,
945 ) -> Self {
946 local_stats.found_key = inner.is_valid();
947 local_stats.sub_iter_count = local_stats.staging_imm_iter_count
948 + local_stats.staging_sst_iter_count
949 + local_stats.overlapping_iter_count
950 + local_stats.non_overlapping_iter_count;
951 Self {
952 inner,
953 initial_read: false,
954 stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
955 }
956 }
957}
958
959impl Drop for HummockStorageRevIteratorInner<'_> {
960 fn drop(&mut self) {
961 self.inner
962 .collect_local_statistic(&mut self.stats_guard.local_stats);
963 }
964}
965
966impl IteratorFactory for ForwardIteratorFactory {
967 type Direction = Forward;
968 type SstableIteratorType = SstableIterator;
969
970 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
971 self.staging_iters
972 .push(HummockIteratorUnion::First(batch.into_forward_iter()));
973 }
974
975 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
976 self.staging_iters.push(HummockIteratorUnion::Second(iter));
977 }
978
979 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
980 self.overlapping_iters.push(iter);
981 }
982
983 fn add_concat_sst_iter(
984 &mut self,
985 tables: Vec<SstableInfo>,
986 sstable_store: SstableStoreRef,
987 read_options: Arc<SstableIteratorReadOptions>,
988 ) {
989 self.non_overlapping_iters
990 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
991 tables,
992 sstable_store,
993 read_options,
994 ));
995 }
996}
997
998#[derive(Default)]
999pub struct BackwardIteratorFactory {
1000 non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
1001 overlapping_iters: Vec<BackwardSstableIterator>,
1002 staging_iters: Vec<
1003 HummockIteratorUnion<
1004 Backward,
1005 SharedBufferBatchIterator<Backward>,
1006 BackwardSstableIterator,
1007 >,
1008 >,
1009}
1010
1011impl BackwardIteratorFactory {
1012 pub fn build(
1013 self,
1014 mem_table: Option<MemTableHummockRevIterator<'_>>,
1015 ) -> StorageRevIteratorPayloadInner<'_> {
1016 let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1018 MergeIterator::new(
1019 once(HummockIteratorUnion::First(staging_iter))
1020 .chain(
1021 self.overlapping_iters
1022 .into_iter()
1023 .map(HummockIteratorUnion::Second),
1024 )
1025 .chain(
1026 self.non_overlapping_iters
1027 .into_iter()
1028 .map(HummockIteratorUnion::Third),
1029 )
1030 .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1031 )
1032 }
1033}
1034
1035impl IteratorFactory for BackwardIteratorFactory {
1036 type Direction = Backward;
1037 type SstableIteratorType = BackwardSstableIterator;
1038
1039 fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1040 self.staging_iters
1041 .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1042 }
1043
1044 fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1045 self.staging_iters.push(HummockIteratorUnion::Second(iter));
1046 }
1047
1048 fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1049 self.overlapping_iters.push(iter);
1050 }
1051
1052 fn add_concat_sst_iter(
1053 &mut self,
1054 mut tables: Vec<SstableInfo>,
1055 sstable_store: SstableStoreRef,
1056 read_options: Arc<SstableIteratorReadOptions>,
1057 ) {
1058 tables.reverse();
1059 self.non_overlapping_iters
1060 .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1061 tables,
1062 sstable_store,
1063 read_options,
1064 ));
1065 }
1066}