risingwave_storage/hummock/store/
local_hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28    FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::{StagingData, VersionUpdate};
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39    Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40    IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44    SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45    SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50    wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54    BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55    SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61/// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to
62/// the hummock state backend. It is created via `HummockStorage::new_local`.
63pub struct LocalHummockStorage {
64    mem_table: MemTable,
65
66    spill_offset: u16,
67    epoch: Option<EpochPair>,
68
69    table_id: TableId,
70    op_consistency_level: OpConsistencyLevel,
71    table_option: TableOption,
72
73    instance_guard: LocalInstanceGuard,
74
75    /// Read handle.
76    read_version: HummockReadVersionRef,
77
78    /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
79    /// It's used by executors in different CNs to synchronize states.
80    ///
81    /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
82    /// persisted, so we won't have duplicate data.
83    ///
84    /// This also handles a corner case where an executor doing replication
85    /// is scheduled to the same CN as its Upstream executor.
86    /// In that case, we use this flag to avoid reading the same data twice,
87    /// by ignoring the replicated `ReadVersion`.
88    is_replicated: bool,
89
90    /// Event sender.
91    event_sender: HummockEventSender,
92
93    memory_limiter: Arc<MemoryLimiter>,
94
95    hummock_version_reader: HummockVersionReader,
96
97    stats: Arc<HummockStateStoreMetrics>,
98
99    write_limiter: WriteLimiterRef,
100
101    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
102
103    mem_table_spill_threshold: usize,
104}
105
106impl LocalHummockFlushedSnapshotReader {
107    async fn get_flushed<O>(
108        hummock_version_reader: &HummockVersionReader,
109        read_version: &HummockReadVersionRef,
110        user_key: UserKey<Bytes>,
111        read_options: ReadOptions,
112        on_key_value_fn: impl crate::store::KeyValueFn<O>,
113    ) -> StorageResult<Option<O>> {
114        let table_key_range = (
115            Bound::Included(user_key.table_key.clone()),
116            Bound::Included(user_key.table_key.clone()),
117        );
118
119        let (table_key_range, read_snapshot) =
120            read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
121
122        if is_empty_key_range(&table_key_range) {
123            return Ok(None);
124        }
125
126        hummock_version_reader
127            .get(
128                user_key.table_key,
129                MAX_EPOCH,
130                user_key.table_id,
131                read_options,
132                read_snapshot,
133                on_key_value_fn,
134            )
135            .await
136    }
137
138    async fn iter_flushed(
139        &self,
140        table_key_range: TableKeyRange,
141        read_options: ReadOptions,
142    ) -> StorageResult<HummockStorageIterator> {
143        let (table_key_range, read_snapshot) = read_filter_for_version(
144            MAX_EPOCH,
145            self.table_id,
146            table_key_range,
147            &self.read_version,
148        )?;
149
150        let table_key_range = table_key_range;
151
152        self.hummock_version_reader
153            .iter(
154                table_key_range,
155                MAX_EPOCH,
156                self.table_id,
157                read_options,
158                read_snapshot,
159            )
160            .await
161    }
162
163    async fn rev_iter_flushed(
164        &self,
165        table_key_range: TableKeyRange,
166        read_options: ReadOptions,
167    ) -> StorageResult<HummockStorageRevIterator> {
168        let (table_key_range, read_snapshot) = read_filter_for_version(
169            MAX_EPOCH,
170            self.table_id,
171            table_key_range,
172            &self.read_version,
173        )?;
174
175        let table_key_range = table_key_range;
176
177        self.hummock_version_reader
178            .rev_iter(
179                table_key_range,
180                MAX_EPOCH,
181                self.table_id,
182                read_options,
183                read_snapshot,
184                None,
185            )
186            .await
187    }
188}
189
190impl LocalHummockStorage {
191    fn epoch(&self) -> u64 {
192        self.epoch.expect("should have set the epoch").curr
193    }
194
195    fn current_epoch_with_gap(&self) -> EpochWithGap {
196        EpochWithGap::new(self.epoch(), self.spill_offset)
197    }
198
199    fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
200        MemTableHummockIterator::new(
201            &self.mem_table.buffer,
202            self.current_epoch_with_gap(),
203            self.table_id,
204        )
205    }
206
207    fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
208        MemTableHummockRevIterator::new(
209            &self.mem_table.buffer,
210            self.current_epoch_with_gap(),
211            self.table_id,
212        )
213    }
214
215    async fn iter_all(
216        &self,
217        table_key_range: TableKeyRange,
218        epoch: u64,
219        read_options: ReadOptions,
220    ) -> StorageResult<LocalHummockStorageIterator<'_>> {
221        let (table_key_range, read_snapshot) =
222            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
223
224        self.hummock_version_reader
225            .iter_with_memtable(
226                table_key_range,
227                epoch,
228                self.table_id,
229                read_options,
230                read_snapshot,
231                Some(self.mem_table_iter()),
232            )
233            .await
234    }
235
236    async fn rev_iter_all(
237        &self,
238        table_key_range: TableKeyRange,
239        epoch: u64,
240        read_options: ReadOptions,
241    ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
242        let (table_key_range, read_snapshot) =
243            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
244
245        self.hummock_version_reader
246            .rev_iter(
247                table_key_range,
248                epoch,
249                self.table_id,
250                read_options,
251                read_snapshot,
252                Some(self.mem_table_rev_iter()),
253            )
254            .await
255    }
256}
257
258#[derive(Clone)]
259pub struct LocalHummockFlushedSnapshotReader {
260    table_id: TableId,
261    read_version: HummockReadVersionRef,
262    hummock_version_reader: HummockVersionReader,
263}
264
265impl StateStoreGet for LocalHummockFlushedSnapshotReader {
266    async fn on_key_value<O: Send + 'static>(
267        &self,
268        key: TableKey<Bytes>,
269        read_options: ReadOptions,
270        on_key_value_fn: impl KeyValueFn<O>,
271    ) -> StorageResult<Option<O>> {
272        let key = UserKey::new(self.table_id, key);
273        Self::get_flushed(
274            &self.hummock_version_reader,
275            &self.read_version,
276            key,
277            read_options,
278            on_key_value_fn,
279        )
280        .await
281    }
282}
283
284impl StateStoreRead for LocalHummockFlushedSnapshotReader {
285    type Iter = HummockStorageIterator;
286    type RevIter = HummockStorageRevIterator;
287
288    fn iter(
289        &self,
290        key_range: TableKeyRange,
291        read_options: ReadOptions,
292    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
293        self.iter_flushed(key_range, read_options)
294            .instrument(tracing::trace_span!("hummock_iter"))
295    }
296
297    fn rev_iter(
298        &self,
299        key_range: TableKeyRange,
300        read_options: ReadOptions,
301    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
302        self.rev_iter_flushed(key_range, read_options)
303            .instrument(tracing::trace_span!("hummock_rev_iter"))
304    }
305}
306
307impl StateStoreGet for LocalHummockStorage {
308    async fn on_key_value<O: Send + 'static>(
309        &self,
310        key: TableKey<Bytes>,
311        read_options: ReadOptions,
312        on_key_value_fn: impl KeyValueFn<O>,
313    ) -> StorageResult<Option<O>> {
314        let key = UserKey::new(self.table_id, key);
315        match self.mem_table.buffer.get(&key.table_key) {
316            None => {
317                LocalHummockFlushedSnapshotReader::get_flushed(
318                    &self.hummock_version_reader,
319                    &self.read_version,
320                    key,
321                    read_options,
322                    on_key_value_fn,
323                )
324                .await
325            }
326            Some(op) => match op {
327                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
328                    Some(on_key_value_fn(
329                        FullKey::new_with_gap_epoch(
330                            self.table_id,
331                            key.table_key.to_ref(),
332                            self.current_epoch_with_gap(),
333                        ),
334                        value.as_ref(),
335                    )?)
336                }),
337                KeyOp::Delete(_) => Ok(None),
338            },
339        }
340    }
341}
342
343impl LocalStateStore for LocalHummockStorage {
344    type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
345    type Iter<'a> = LocalHummockStorageIterator<'a>;
346    type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
347
348    async fn iter(
349        &self,
350        key_range: TableKeyRange,
351        read_options: ReadOptions,
352    ) -> StorageResult<Self::Iter<'_>> {
353        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
354        assert_eq!(
355            r_vnode_exclusive - l_vnode_inclusive,
356            1,
357            "read range {:?} for table {} iter contains more than one vnode",
358            key_range,
359            self.table_id
360        );
361        self.iter_all(key_range.clone(), self.epoch(), read_options)
362            .await
363    }
364
365    async fn rev_iter(
366        &self,
367        key_range: TableKeyRange,
368        read_options: ReadOptions,
369    ) -> StorageResult<Self::RevIter<'_>> {
370        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
371        assert_eq!(
372            r_vnode_exclusive - l_vnode_inclusive,
373            1,
374            "read range {:?} for table {} iter contains more than one vnode",
375            key_range,
376            self.table_id
377        );
378        self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
379            .await
380    }
381
382    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
383        self.new_flushed_snapshot_reader_inner()
384    }
385
386    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
387        self.read_version.read().latest_watermark(vnode)
388    }
389
390    fn insert(
391        &mut self,
392        key: TableKey<Bytes>,
393        new_val: Bytes,
394        old_val: Option<Bytes>,
395    ) -> StorageResult<()> {
396        match old_val {
397            None => self.mem_table.insert(key, new_val)?,
398            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
399        };
400
401        Ok(())
402    }
403
404    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
405        self.mem_table.delete(key, old_val)?;
406
407        Ok(())
408    }
409
410    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
411        self.update_vnode_bitmap_impl(vnodes).await
412    }
413}
414
415impl StateStoreWriteEpochControl for LocalHummockStorage {
416    async fn flush(&mut self) -> StorageResult<usize> {
417        let buffer = self.mem_table.drain().into_parts();
418        let mut kv_pairs = Vec::with_capacity(buffer.len());
419        let mut old_values = if self.is_flush_old_value() {
420            Some(Vec::with_capacity(buffer.len()))
421        } else {
422            None
423        };
424        let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
425            Some(self.new_flushed_snapshot_reader_inner())
426        } else {
427            None
428        };
429        for (key, key_op) in buffer {
430            match key_op {
431                // Currently, some executors do not strictly comply with these semantics. As
432                // a workaround you may call disable the check by initializing the
433                // state store with `is_consistent_op=false`.
434                KeyOp::Insert(value) => {
435                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
436                        do_insert_sanity_check(
437                            &key,
438                            &value,
439                            sanity_check_reader,
440                            self.table_option,
441                            &self.op_consistency_level,
442                        )
443                        .await?;
444                    }
445                    kv_pairs.push((key, SharedBufferValue::Insert(value)));
446                    if let Some(old_values) = &mut old_values {
447                        old_values.push(Bytes::new());
448                    }
449                }
450                KeyOp::Delete(old_value) => {
451                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
452                        do_delete_sanity_check(
453                            &key,
454                            &old_value,
455                            sanity_check_reader,
456                            self.table_option,
457                            &self.op_consistency_level,
458                        )
459                        .await?;
460                    }
461                    kv_pairs.push((key, SharedBufferValue::Delete));
462                    if let Some(old_values) = &mut old_values {
463                        old_values.push(old_value);
464                    }
465                }
466                KeyOp::Update((old_value, new_value)) => {
467                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
468                        do_update_sanity_check(
469                            &key,
470                            &old_value,
471                            &new_value,
472                            sanity_check_reader,
473                            self.table_option,
474                            &self.op_consistency_level,
475                        )
476                        .await?;
477                    }
478                    kv_pairs.push((key, SharedBufferValue::Update(new_value)));
479                    if let Some(old_values) = &mut old_values {
480                        old_values.push(old_value);
481                    }
482                }
483            }
484        }
485        self.flush_inner(kv_pairs, old_values).await
486    }
487
488    async fn try_flush(&mut self) -> StorageResult<()> {
489        if self.mem_table_spill_threshold != 0
490            && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
491        {
492            if self.spill_offset < MAX_SPILL_TIMES {
493                let table_id_label = self.table_id.table_id().to_string();
494                self.flush().await?;
495                self.stats
496                    .mem_table_spill_counts
497                    .with_label_values(&[table_id_label.as_str()])
498                    .inc();
499            } else {
500                tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
501            }
502        }
503
504        Ok(())
505    }
506
507    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
508        let epoch = options.epoch;
509        wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
510        assert_eq!(
511            self.epoch.replace(epoch),
512            None,
513            "local state store of table id {:?} is init for more than once",
514            self.table_id
515        );
516        if !self.is_replicated {
517            self.event_sender
518                .send(HummockEvent::InitEpoch {
519                    instance_id: self.instance_id(),
520                    init_epoch: options.epoch.curr,
521                })
522                .map_err(|_| {
523                    HummockError::other("failed to send InitEpoch. maybe shutting down")
524                })?;
525        }
526        Ok(())
527    }
528
529    fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
530        assert!(!self.mem_table.is_dirty());
531        if let Some(new_level) = &opts.switch_op_consistency_level {
532            self.mem_table.op_consistency_level.update(new_level);
533            self.op_consistency_level.update(new_level);
534        }
535        let epoch = self
536            .epoch
537            .as_mut()
538            .expect("should have init epoch before seal the first epoch");
539        let prev_epoch = epoch.curr;
540        epoch.prev = prev_epoch;
541        epoch.curr = next_epoch;
542        self.spill_offset = 0;
543        assert!(
544            next_epoch > prev_epoch,
545            "new epoch {} should be greater than current epoch: {}",
546            next_epoch,
547            prev_epoch
548        );
549
550        // only update the PkPrefix watermark for read
551        if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
552            &mut opts.table_watermarks
553        {
554            let mut read_version = self.read_version.write();
555            read_version.filter_regress_watermarks(watermarks);
556            if !watermarks.is_empty() {
557                read_version.update(VersionUpdate::NewTableWatermark {
558                    direction: *direction,
559                    epoch: prev_epoch,
560                    vnode_watermarks: watermarks.clone(),
561                    watermark_type: WatermarkSerdeType::PkPrefix,
562                });
563            }
564        }
565
566        if !self.is_replicated
567            && self
568                .event_sender
569                .send(HummockEvent::LocalSealEpoch {
570                    instance_id: self.instance_id(),
571                    next_epoch,
572                    opts,
573                })
574                .is_err()
575        {
576            warn!("failed to send LocalSealEpoch. maybe shutting down");
577        }
578    }
579}
580
581impl LocalHummockStorage {
582    async fn update_vnode_bitmap_impl(
583        &mut self,
584        vnodes: Arc<Bitmap>,
585    ) -> StorageResult<Arc<Bitmap>> {
586        wait_for_epoch(
587            &self.version_update_notifier_tx,
588            self.epoch.expect("should have init").prev,
589            self.table_id,
590        )
591        .await?;
592        assert!(!self.mem_table.is_dirty());
593        let mut read_version = self.read_version.write();
594        assert!(
595            read_version.staging().is_empty(),
596            "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
597            self.table_id(),
598            self.instance_id()
599        );
600        Ok(read_version.update_vnode_bitmap(vnodes))
601    }
602
603    fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
604        LocalHummockFlushedSnapshotReader {
605            table_id: self.table_id,
606            read_version: self.read_version.clone(),
607            hummock_version_reader: self.hummock_version_reader.clone(),
608        }
609    }
610
611    async fn flush_inner(
612        &mut self,
613        sorted_items: Vec<SharedBufferItem>,
614        old_values: Option<Vec<Bytes>>,
615    ) -> StorageResult<usize> {
616        let epoch = self.epoch();
617        let table_id = self.table_id;
618
619        let table_id_label = table_id.to_string();
620        self.stats
621            .write_batch_tuple_counts
622            .with_label_values(&[table_id_label.as_str()])
623            .inc_by(sorted_items.len() as _);
624        let timer = self
625            .stats
626            .write_batch_duration
627            .with_label_values(&[table_id_label.as_str()])
628            .start_timer();
629
630        let imm_size = if !sorted_items.is_empty() {
631            let (size, old_value_size) =
632                SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
633
634            self.write_limiter.wait_permission(self.table_id).await;
635            let limiter = self.memory_limiter.as_ref();
636            let tracker = match limiter.try_require_memory(size as u64) {
637                Some(tracker) => tracker,
638                _ => {
639                    warn!(
640                        "blocked at requiring memory: {}, current {}",
641                        size,
642                        limiter.get_memory_usage()
643                    );
644                    self.event_sender
645                        .send(HummockEvent::BufferMayFlush)
646                        .expect("should be able to send");
647                    let tracker = limiter
648                        .require_memory(size as u64)
649                        .instrument_await("hummock_require_memory".verbose())
650                        .await;
651                    warn!(
652                        "successfully requiring memory: {}, current {}",
653                        size,
654                        limiter.get_memory_usage()
655                    );
656                    tracker
657                }
658            };
659
660            let old_values = old_values.map(|old_values| {
661                SharedBufferBatchOldValues::new(
662                    old_values,
663                    old_value_size,
664                    self.stats.old_value_size.clone(),
665                )
666            });
667
668            let instance_id = self.instance_guard.instance_id;
669            let imm = SharedBufferBatch::build_shared_buffer_batch(
670                epoch,
671                self.spill_offset,
672                sorted_items,
673                old_values,
674                size,
675                table_id,
676                Some(tracker),
677            );
678            self.spill_offset += 1;
679            let imm_size = imm.size();
680            self.read_version
681                .write()
682                .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
683
684            // insert imm to uploader
685            if !self.is_replicated {
686                self.event_sender
687                    .send(HummockEvent::ImmToUploader { instance_id, imm })
688                    .map_err(|_| {
689                        HummockError::other("failed to send imm to uploader. maybe shutting down")
690                    })?;
691            }
692            imm_size
693        } else {
694            0
695        };
696
697        timer.observe_duration();
698
699        self.stats
700            .write_batch_size
701            .with_label_values(&[table_id_label.as_str()])
702            .observe(imm_size as _);
703        Ok(imm_size)
704    }
705}
706
707impl LocalHummockStorage {
708    #[allow(clippy::too_many_arguments)]
709    pub fn new(
710        instance_guard: LocalInstanceGuard,
711        read_version: HummockReadVersionRef,
712        hummock_version_reader: HummockVersionReader,
713        event_sender: HummockEventSender,
714        memory_limiter: Arc<MemoryLimiter>,
715        write_limiter: WriteLimiterRef,
716        option: NewLocalOptions,
717        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
718        mem_table_spill_threshold: usize,
719    ) -> Self {
720        let stats = hummock_version_reader.stats().clone();
721        Self {
722            mem_table: MemTable::new(option.op_consistency_level.clone()),
723            spill_offset: 0,
724            epoch: None,
725            table_id: option.table_id,
726            op_consistency_level: option.op_consistency_level,
727            table_option: option.table_option,
728            is_replicated: option.is_replicated,
729            instance_guard,
730            read_version,
731            event_sender,
732            memory_limiter,
733            hummock_version_reader,
734            stats,
735            write_limiter,
736            version_update_notifier_tx,
737            mem_table_spill_threshold,
738        }
739    }
740
741    /// See `HummockReadVersion::update` for more details.
742    pub fn read_version(&self) -> HummockReadVersionRef {
743        self.read_version.clone()
744    }
745
746    pub fn table_id(&self) -> TableId {
747        self.instance_guard.table_id
748    }
749
750    pub fn instance_id(&self) -> u64 {
751        self.instance_guard.instance_id
752    }
753
754    fn is_flush_old_value(&self) -> bool {
755        matches!(
756            &self.op_consistency_level,
757            OpConsistencyLevel::ConsistentOldValue {
758                is_log_store: true,
759                ..
760            }
761        )
762    }
763}
764
765pub type StagingDataIterator = MergeIterator<
766    HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
767>;
768pub type StagingDataRevIterator = MergeIterator<
769    HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
770>;
771pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
772    HummockIteratorUnion<
773        Forward,
774        StagingDataIterator,
775        SstableIterator,
776        ConcatIteratorInner<SstableIterator>,
777        MemTableHummockIterator<'a>,
778    >,
779>;
780
781pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
782    HummockIteratorUnion<
783        Backward,
784        StagingDataRevIterator,
785        BackwardSstableIterator,
786        ConcatIteratorInner<BackwardSstableIterator>,
787        MemTableHummockRevIterator<'a>,
788    >,
789>;
790
791pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
792pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
793pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
794pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
795
796pub struct HummockStorageIteratorInner<'a> {
797    inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
798    initial_read: bool,
799    stats_guard: IterLocalMetricsGuard,
800}
801
802impl StateStoreIter for HummockStorageIteratorInner<'_> {
803    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
804        let iter = &mut self.inner;
805        if !self.initial_read {
806            self.initial_read = true;
807        } else {
808            iter.next().await?;
809        }
810
811        if iter.is_valid() {
812            Ok(Some((iter.key(), iter.value())))
813        } else {
814            Ok(None)
815        }
816    }
817}
818
819impl<'a> HummockStorageIteratorInner<'a> {
820    pub fn new(
821        inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
822        metrics: Arc<HummockStateStoreMetrics>,
823        table_id: TableId,
824        mut local_stats: StoreLocalStatistic,
825    ) -> Self {
826        local_stats.found_key = inner.is_valid();
827        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
828            + local_stats.staging_sst_iter_count
829            + local_stats.overlapping_iter_count
830            + local_stats.non_overlapping_iter_count;
831        Self {
832            inner,
833            initial_read: false,
834            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
835        }
836    }
837}
838
839impl Drop for HummockStorageIteratorInner<'_> {
840    fn drop(&mut self) {
841        self.inner
842            .collect_local_statistic(&mut self.stats_guard.local_stats);
843    }
844}
845
846#[derive(Default)]
847pub struct ForwardIteratorFactory {
848    non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
849    overlapping_iters: Vec<SstableIterator>,
850    staging_iters:
851        Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
852}
853
854impl ForwardIteratorFactory {
855    pub fn build(
856        self,
857        mem_table: Option<MemTableHummockIterator<'_>>,
858    ) -> HummockStorageIteratorPayloadInner<'_> {
859        // 3. build user_iterator
860        let staging_iter = StagingDataIterator::new(self.staging_iters);
861        MergeIterator::new(
862            once(HummockIteratorUnion::First(staging_iter))
863                .chain(
864                    self.overlapping_iters
865                        .into_iter()
866                        .map(HummockIteratorUnion::Second),
867                )
868                .chain(
869                    self.non_overlapping_iters
870                        .into_iter()
871                        .map(HummockIteratorUnion::Third),
872                )
873                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
874        )
875    }
876}
877
878pub struct HummockStorageRevIteratorInner<'a> {
879    inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
880    initial_read: bool,
881    stats_guard: IterLocalMetricsGuard,
882}
883
884impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
885    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
886        let iter = &mut self.inner;
887        if !self.initial_read {
888            self.initial_read = true;
889        } else {
890            iter.next().await?;
891        }
892
893        if iter.is_valid() {
894            Ok(Some((iter.key(), iter.value())))
895        } else {
896            Ok(None)
897        }
898    }
899}
900
901impl<'a> HummockStorageRevIteratorInner<'a> {
902    pub fn new(
903        inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
904        metrics: Arc<HummockStateStoreMetrics>,
905        table_id: TableId,
906        mut local_stats: StoreLocalStatistic,
907    ) -> Self {
908        local_stats.found_key = inner.is_valid();
909        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
910            + local_stats.staging_sst_iter_count
911            + local_stats.overlapping_iter_count
912            + local_stats.non_overlapping_iter_count;
913        Self {
914            inner,
915            initial_read: false,
916            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
917        }
918    }
919}
920
921impl Drop for HummockStorageRevIteratorInner<'_> {
922    fn drop(&mut self) {
923        self.inner
924            .collect_local_statistic(&mut self.stats_guard.local_stats);
925    }
926}
927
928impl IteratorFactory for ForwardIteratorFactory {
929    type Direction = Forward;
930    type SstableIteratorType = SstableIterator;
931
932    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
933        self.staging_iters
934            .push(HummockIteratorUnion::First(batch.into_forward_iter()));
935    }
936
937    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
938        self.staging_iters.push(HummockIteratorUnion::Second(iter));
939    }
940
941    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
942        self.overlapping_iters.push(iter);
943    }
944
945    fn add_concat_sst_iter(
946        &mut self,
947        tables: Vec<SstableInfo>,
948        sstable_store: SstableStoreRef,
949        read_options: Arc<SstableIteratorReadOptions>,
950    ) {
951        self.non_overlapping_iters
952            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
953                tables,
954                sstable_store,
955                read_options,
956            ));
957    }
958}
959
960#[derive(Default)]
961pub struct BackwardIteratorFactory {
962    non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
963    overlapping_iters: Vec<BackwardSstableIterator>,
964    staging_iters: Vec<
965        HummockIteratorUnion<
966            Backward,
967            SharedBufferBatchIterator<Backward>,
968            BackwardSstableIterator,
969        >,
970    >,
971}
972
973impl BackwardIteratorFactory {
974    pub fn build(
975        self,
976        mem_table: Option<MemTableHummockRevIterator<'_>>,
977    ) -> StorageRevIteratorPayloadInner<'_> {
978        // 3. build user_iterator
979        let staging_iter = StagingDataRevIterator::new(self.staging_iters);
980        MergeIterator::new(
981            once(HummockIteratorUnion::First(staging_iter))
982                .chain(
983                    self.overlapping_iters
984                        .into_iter()
985                        .map(HummockIteratorUnion::Second),
986                )
987                .chain(
988                    self.non_overlapping_iters
989                        .into_iter()
990                        .map(HummockIteratorUnion::Third),
991                )
992                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
993        )
994    }
995}
996
997impl IteratorFactory for BackwardIteratorFactory {
998    type Direction = Backward;
999    type SstableIteratorType = BackwardSstableIterator;
1000
1001    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1002        self.staging_iters
1003            .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1004    }
1005
1006    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1007        self.staging_iters.push(HummockIteratorUnion::Second(iter));
1008    }
1009
1010    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1011        self.overlapping_iters.push(iter);
1012    }
1013
1014    fn add_concat_sst_iter(
1015        &mut self,
1016        mut tables: Vec<SstableInfo>,
1017        sstable_store: SstableStoreRef,
1018        read_options: Arc<SstableIteratorReadOptions>,
1019    ) {
1020        tables.reverse();
1021        self.non_overlapping_iters
1022            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1023                tables,
1024                sstable_store,
1025                read_options,
1026            ));
1027    }
1028}