risingwave_storage/hummock/store/
local_hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28    FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::VersionUpdate;
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39    Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40    IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44    SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45    SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50    wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54    BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55    SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61/// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to
62/// the hummock state backend. It is created via `HummockStorage::new_local`.
63pub struct LocalHummockStorage {
64    mem_table: MemTable,
65
66    spill_offset: u16,
67    epoch: Option<EpochPair>,
68
69    table_id: TableId,
70    op_consistency_level: OpConsistencyLevel,
71    table_option: TableOption,
72
73    instance_guard: LocalInstanceGuard,
74
75    /// Read handle.
76    read_version: HummockReadVersionRef,
77
78    /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
79    /// It's used by executors in different CNs to synchronize states.
80    ///
81    /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
82    /// persisted, so we won't have duplicate data.
83    ///
84    /// This also handles a corner case where an executor doing replication
85    /// is scheduled to the same CN as its Upstream executor.
86    /// In that case, we use this flag to avoid reading the same data twice,
87    /// by ignoring the replicated `ReadVersion`.
88    is_replicated: bool,
89
90    /// Whether or not send imm to uploader on every flush.
91    upload_on_flush: bool,
92
93    /// Event sender.
94    event_sender: HummockEventSender,
95
96    memory_limiter: Arc<MemoryLimiter>,
97
98    hummock_version_reader: HummockVersionReader,
99
100    stats: Arc<HummockStateStoreMetrics>,
101
102    write_limiter: WriteLimiterRef,
103
104    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106    mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110    async fn get_flushed<O>(
111        hummock_version_reader: &HummockVersionReader,
112        read_version: &HummockReadVersionRef,
113        user_key: UserKey<Bytes>,
114        read_options: ReadOptions,
115        on_key_value_fn: impl crate::store::KeyValueFn<O>,
116    ) -> StorageResult<Option<O>> {
117        let table_key_range = (
118            Bound::Included(user_key.table_key.clone()),
119            Bound::Included(user_key.table_key.clone()),
120        );
121
122        let (table_key_range, read_snapshot) =
123            read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
124
125        if is_empty_key_range(&table_key_range) {
126            return Ok(None);
127        }
128
129        hummock_version_reader
130            .get(
131                user_key.table_key,
132                MAX_EPOCH,
133                user_key.table_id,
134                read_options,
135                read_snapshot,
136                on_key_value_fn,
137            )
138            .await
139    }
140
141    async fn iter_flushed(
142        &self,
143        table_key_range: TableKeyRange,
144        read_options: ReadOptions,
145    ) -> StorageResult<HummockStorageIterator> {
146        let (table_key_range, read_snapshot) = read_filter_for_version(
147            MAX_EPOCH,
148            self.table_id,
149            table_key_range,
150            &self.read_version,
151        )?;
152
153        let table_key_range = table_key_range;
154
155        self.hummock_version_reader
156            .iter(
157                table_key_range,
158                MAX_EPOCH,
159                self.table_id,
160                read_options,
161                read_snapshot,
162            )
163            .await
164    }
165
166    async fn rev_iter_flushed(
167        &self,
168        table_key_range: TableKeyRange,
169        read_options: ReadOptions,
170    ) -> StorageResult<HummockStorageRevIterator> {
171        let (table_key_range, read_snapshot) = read_filter_for_version(
172            MAX_EPOCH,
173            self.table_id,
174            table_key_range,
175            &self.read_version,
176        )?;
177
178        let table_key_range = table_key_range;
179
180        self.hummock_version_reader
181            .rev_iter(
182                table_key_range,
183                MAX_EPOCH,
184                self.table_id,
185                read_options,
186                read_snapshot,
187                None,
188            )
189            .await
190    }
191}
192
193impl LocalHummockStorage {
194    fn epoch(&self) -> u64 {
195        self.epoch.expect("should have set the epoch").curr
196    }
197
198    fn current_epoch_with_gap(&self) -> EpochWithGap {
199        EpochWithGap::new(self.epoch(), self.spill_offset)
200    }
201
202    fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
203        MemTableHummockIterator::new(
204            &self.mem_table.buffer,
205            self.current_epoch_with_gap(),
206            self.table_id,
207        )
208    }
209
210    fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
211        MemTableHummockRevIterator::new(
212            &self.mem_table.buffer,
213            self.current_epoch_with_gap(),
214            self.table_id,
215        )
216    }
217
218    async fn iter_all(
219        &self,
220        table_key_range: TableKeyRange,
221        epoch: u64,
222        read_options: ReadOptions,
223    ) -> StorageResult<LocalHummockStorageIterator<'_>> {
224        let (table_key_range, read_snapshot) =
225            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
226
227        self.hummock_version_reader
228            .iter_with_memtable(
229                table_key_range,
230                epoch,
231                self.table_id,
232                read_options,
233                read_snapshot,
234                Some(self.mem_table_iter()),
235            )
236            .await
237    }
238
239    async fn rev_iter_all(
240        &self,
241        table_key_range: TableKeyRange,
242        epoch: u64,
243        read_options: ReadOptions,
244    ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245        let (table_key_range, read_snapshot) =
246            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248        self.hummock_version_reader
249            .rev_iter(
250                table_key_range,
251                epoch,
252                self.table_id,
253                read_options,
254                read_snapshot,
255                Some(self.mem_table_rev_iter()),
256            )
257            .await
258    }
259}
260
261#[derive(Clone)]
262pub struct LocalHummockFlushedSnapshotReader {
263    table_id: TableId,
264    read_version: HummockReadVersionRef,
265    hummock_version_reader: HummockVersionReader,
266}
267
268impl StateStoreGet for LocalHummockFlushedSnapshotReader {
269    async fn on_key_value<O: Send + 'static>(
270        &self,
271        key: TableKey<Bytes>,
272        read_options: ReadOptions,
273        on_key_value_fn: impl KeyValueFn<O>,
274    ) -> StorageResult<Option<O>> {
275        let key = UserKey::new(self.table_id, key);
276        Self::get_flushed(
277            &self.hummock_version_reader,
278            &self.read_version,
279            key,
280            read_options,
281            on_key_value_fn,
282        )
283        .await
284    }
285}
286
287impl StateStoreRead for LocalHummockFlushedSnapshotReader {
288    type Iter = HummockStorageIterator;
289    type RevIter = HummockStorageRevIterator;
290
291    fn iter(
292        &self,
293        key_range: TableKeyRange,
294        read_options: ReadOptions,
295    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
296        self.iter_flushed(key_range, read_options)
297            .instrument(tracing::trace_span!("hummock_iter"))
298    }
299
300    fn rev_iter(
301        &self,
302        key_range: TableKeyRange,
303        read_options: ReadOptions,
304    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
305        self.rev_iter_flushed(key_range, read_options)
306            .instrument(tracing::trace_span!("hummock_rev_iter"))
307    }
308}
309
310impl StateStoreGet for LocalHummockStorage {
311    async fn on_key_value<O: Send + 'static>(
312        &self,
313        key: TableKey<Bytes>,
314        read_options: ReadOptions,
315        on_key_value_fn: impl KeyValueFn<O>,
316    ) -> StorageResult<Option<O>> {
317        let key = UserKey::new(self.table_id, key);
318        match self.mem_table.buffer.get(&key.table_key) {
319            None => {
320                LocalHummockFlushedSnapshotReader::get_flushed(
321                    &self.hummock_version_reader,
322                    &self.read_version,
323                    key,
324                    read_options,
325                    on_key_value_fn,
326                )
327                .await
328            }
329            Some(op) => match op {
330                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
331                    Some(on_key_value_fn(
332                        FullKey::new_with_gap_epoch(
333                            self.table_id,
334                            key.table_key.to_ref(),
335                            self.current_epoch_with_gap(),
336                        ),
337                        value.as_ref(),
338                    )?)
339                }),
340                KeyOp::Delete(_) => Ok(None),
341            },
342        }
343    }
344}
345
346impl LocalStateStore for LocalHummockStorage {
347    type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
348    type Iter<'a> = LocalHummockStorageIterator<'a>;
349    type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
350
351    async fn iter(
352        &self,
353        key_range: TableKeyRange,
354        read_options: ReadOptions,
355    ) -> StorageResult<Self::Iter<'_>> {
356        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
357        assert_eq!(
358            r_vnode_exclusive - l_vnode_inclusive,
359            1,
360            "read range {:?} for table {} iter contains more than one vnode",
361            key_range,
362            self.table_id
363        );
364        self.iter_all(key_range.clone(), self.epoch(), read_options)
365            .await
366    }
367
368    async fn rev_iter(
369        &self,
370        key_range: TableKeyRange,
371        read_options: ReadOptions,
372    ) -> StorageResult<Self::RevIter<'_>> {
373        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
374        assert_eq!(
375            r_vnode_exclusive - l_vnode_inclusive,
376            1,
377            "read range {:?} for table {} iter contains more than one vnode",
378            key_range,
379            self.table_id
380        );
381        self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
382            .await
383    }
384
385    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
386        self.new_flushed_snapshot_reader_inner()
387    }
388
389    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
390        self.read_version.read().latest_watermark(vnode)
391    }
392
393    fn insert(
394        &mut self,
395        key: TableKey<Bytes>,
396        new_val: Bytes,
397        old_val: Option<Bytes>,
398    ) -> StorageResult<()> {
399        match old_val {
400            None => self.mem_table.insert(key, new_val)?,
401            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
402        };
403
404        Ok(())
405    }
406
407    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
408        self.mem_table.delete(key, old_val)?;
409
410        Ok(())
411    }
412
413    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
414        self.update_vnode_bitmap_impl(vnodes).await
415    }
416}
417
418impl StateStoreWriteEpochControl for LocalHummockStorage {
419    async fn flush(&mut self) -> StorageResult<usize> {
420        let buffer = self.mem_table.drain().into_parts();
421        let mut kv_pairs = Vec::with_capacity(buffer.len());
422        let mut old_values = if self.is_flush_old_value() {
423            Some(Vec::with_capacity(buffer.len()))
424        } else {
425            None
426        };
427        let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
428            Some(self.new_flushed_snapshot_reader_inner())
429        } else {
430            None
431        };
432        for (key, key_op) in buffer {
433            match key_op {
434                // Currently, some executors do not strictly comply with these semantics. As
435                // a workaround you may call disable the check by initializing the
436                // state store with `is_consistent_op=false`.
437                KeyOp::Insert(value) => {
438                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
439                        do_insert_sanity_check(
440                            self.table_id,
441                            &key,
442                            &value,
443                            sanity_check_reader,
444                            self.table_option,
445                            &self.op_consistency_level,
446                        )
447                        .await?;
448                    }
449                    kv_pairs.push((key, SharedBufferValue::Insert(value)));
450                    if let Some(old_values) = &mut old_values {
451                        old_values.push(Bytes::new());
452                    }
453                }
454                KeyOp::Delete(old_value) => {
455                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
456                        do_delete_sanity_check(
457                            self.table_id,
458                            &key,
459                            &old_value,
460                            sanity_check_reader,
461                            self.table_option,
462                            &self.op_consistency_level,
463                        )
464                        .await?;
465                    }
466                    kv_pairs.push((key, SharedBufferValue::Delete));
467                    if let Some(old_values) = &mut old_values {
468                        old_values.push(old_value);
469                    }
470                }
471                KeyOp::Update((old_value, new_value)) => {
472                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
473                        do_update_sanity_check(
474                            self.table_id,
475                            &key,
476                            &old_value,
477                            &new_value,
478                            sanity_check_reader,
479                            self.table_option,
480                            &self.op_consistency_level,
481                        )
482                        .await?;
483                    }
484                    kv_pairs.push((key, SharedBufferValue::Update(new_value)));
485                    if let Some(old_values) = &mut old_values {
486                        old_values.push(old_value);
487                    }
488                }
489            }
490        }
491        self.flush_inner(kv_pairs, old_values).await
492    }
493
494    async fn try_flush(&mut self) -> StorageResult<()> {
495        if self.mem_table_spill_threshold != 0
496            && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
497        {
498            if self.spill_offset < MAX_SPILL_TIMES {
499                let table_id_label = self.table_id.table_id().to_string();
500                self.flush().await?;
501                self.stats
502                    .mem_table_spill_counts
503                    .with_label_values(&[table_id_label.as_str()])
504                    .inc();
505            } else {
506                tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
507            }
508        }
509
510        Ok(())
511    }
512
513    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
514        let epoch = options.epoch;
515        wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
516        assert_eq!(
517            self.epoch.replace(epoch),
518            None,
519            "local state store of table id {:?} is init for more than once",
520            self.table_id
521        );
522        if !self.is_replicated {
523            self.event_sender
524                .send(HummockEvent::InitEpoch {
525                    instance_id: self.instance_id(),
526                    init_epoch: options.epoch.curr,
527                })
528                .map_err(|_| {
529                    HummockError::other("failed to send InitEpoch. maybe shutting down")
530                })?;
531        }
532        Ok(())
533    }
534
535    fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
536        assert!(!self.mem_table.is_dirty());
537        if !self.is_replicated {
538            if self.upload_on_flush {
539                debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
540            } else {
541                let pending_imms = self.read_version.write().start_upload_pending_imms();
542                if !pending_imms.is_empty()
543                    && self
544                        .event_sender
545                        .send(HummockEvent::ImmToUploader {
546                            instance_id: self.instance_id(),
547                            imms: pending_imms,
548                        })
549                        .is_err()
550                {
551                    warn!("failed to send ImmToUploader during seal. maybe shutting down");
552                }
553            }
554        }
555
556        if let Some(new_level) = &opts.switch_op_consistency_level {
557            self.mem_table.op_consistency_level.update(new_level);
558            self.op_consistency_level.update(new_level);
559        }
560        let epoch = self
561            .epoch
562            .as_mut()
563            .expect("should have init epoch before seal the first epoch");
564        let prev_epoch = epoch.curr;
565        epoch.prev = prev_epoch;
566        epoch.curr = next_epoch;
567        self.spill_offset = 0;
568        assert!(
569            next_epoch > prev_epoch,
570            "new epoch {} should be greater than current epoch: {}",
571            next_epoch,
572            prev_epoch
573        );
574
575        // only update the PkPrefix watermark for read
576        if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
577            &mut opts.table_watermarks
578        {
579            let mut read_version = self.read_version.write();
580            read_version.filter_regress_watermarks(watermarks);
581            if !watermarks.is_empty() {
582                read_version.update(VersionUpdate::NewTableWatermark {
583                    direction: *direction,
584                    epoch: prev_epoch,
585                    vnode_watermarks: watermarks.clone(),
586                    watermark_type: WatermarkSerdeType::PkPrefix,
587                });
588            }
589        }
590
591        if !self.is_replicated
592            && self
593                .event_sender
594                .send(HummockEvent::LocalSealEpoch {
595                    instance_id: self.instance_id(),
596                    next_epoch,
597                    opts,
598                })
599                .is_err()
600        {
601            warn!("failed to send LocalSealEpoch. maybe shutting down");
602        }
603    }
604}
605
606impl LocalHummockStorage {
607    async fn update_vnode_bitmap_impl(
608        &mut self,
609        vnodes: Arc<Bitmap>,
610    ) -> StorageResult<Arc<Bitmap>> {
611        wait_for_epoch(
612            &self.version_update_notifier_tx,
613            self.epoch.expect("should have init").prev,
614            self.table_id,
615        )
616        .await?;
617        assert!(!self.mem_table.is_dirty());
618        let mut read_version = self.read_version.write();
619        assert!(
620            read_version.staging().is_empty(),
621            "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
622            self.table_id(),
623            self.instance_id()
624        );
625        Ok(read_version.update_vnode_bitmap(vnodes))
626    }
627
628    fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
629        LocalHummockFlushedSnapshotReader {
630            table_id: self.table_id,
631            read_version: self.read_version.clone(),
632            hummock_version_reader: self.hummock_version_reader.clone(),
633        }
634    }
635
636    async fn flush_inner(
637        &mut self,
638        sorted_items: Vec<SharedBufferItem>,
639        old_values: Option<Vec<Bytes>>,
640    ) -> StorageResult<usize> {
641        let epoch = self.epoch();
642        let table_id = self.table_id;
643
644        let table_id_label = table_id.to_string();
645        self.stats
646            .write_batch_tuple_counts
647            .with_label_values(&[table_id_label.as_str()])
648            .inc_by(sorted_items.len() as _);
649        let timer = self
650            .stats
651            .write_batch_duration
652            .with_label_values(&[table_id_label.as_str()])
653            .start_timer();
654
655        let imm_size = if !sorted_items.is_empty() {
656            let (size, old_value_size) =
657                SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
658
659            self.write_limiter.wait_permission(self.table_id).await;
660            let limiter = self.memory_limiter.as_ref();
661            let tracker = match limiter.try_require_memory(size as u64) {
662                Some(tracker) => tracker,
663                _ => {
664                    warn!(
665                        "blocked at requiring memory: {}, current {}",
666                        size,
667                        limiter.get_memory_usage()
668                    );
669                    self.event_sender
670                        .send(HummockEvent::BufferMayFlush)
671                        .expect("should be able to send");
672                    let tracker = limiter
673                        .require_memory(size as u64)
674                        .instrument_await("hummock_require_memory".verbose())
675                        .await;
676                    warn!(
677                        "successfully requiring memory: {}, current {}",
678                        size,
679                        limiter.get_memory_usage()
680                    );
681                    tracker
682                }
683            };
684
685            let old_values = old_values.map(|old_values| {
686                SharedBufferBatchOldValues::new(
687                    old_values,
688                    old_value_size,
689                    self.stats.old_value_size.clone(),
690                )
691            });
692
693            let instance_id = self.instance_guard.instance_id;
694            let imm = SharedBufferBatch::build_shared_buffer_batch(
695                epoch,
696                self.spill_offset,
697                sorted_items,
698                old_values,
699                size,
700                table_id,
701                Some(tracker),
702            );
703            self.spill_offset += 1;
704            let imm_size = imm.size();
705            let mut read_version = self.read_version.write();
706            read_version.add_imm(imm);
707
708            // insert imm to uploader
709            if !self.is_replicated
710                && (self.upload_on_flush
711                    || read_version.pending_imm_size() >= self.mem_table_spill_threshold)
712            {
713                let imms = read_version.start_upload_pending_imms();
714                self.event_sender
715                    .send(HummockEvent::ImmToUploader { instance_id, imms })
716                    .map_err(|_| {
717                        HummockError::other("failed to send imm to uploader. maybe shutting down")
718                    })?;
719            }
720            imm_size
721        } else {
722            0
723        };
724
725        timer.observe_duration();
726
727        self.stats
728            .write_batch_size
729            .with_label_values(&[table_id_label.as_str()])
730            .observe(imm_size as _);
731        Ok(imm_size)
732    }
733}
734
735impl LocalHummockStorage {
736    #[allow(clippy::too_many_arguments)]
737    pub fn new(
738        instance_guard: LocalInstanceGuard,
739        read_version: HummockReadVersionRef,
740        hummock_version_reader: HummockVersionReader,
741        event_sender: HummockEventSender,
742        memory_limiter: Arc<MemoryLimiter>,
743        write_limiter: WriteLimiterRef,
744        option: NewLocalOptions,
745        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
746        mem_table_spill_threshold: usize,
747    ) -> Self {
748        let stats = hummock_version_reader.stats().clone();
749        Self {
750            mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
751            spill_offset: 0,
752            epoch: None,
753            table_id: option.table_id,
754            op_consistency_level: option.op_consistency_level,
755            table_option: option.table_option,
756            is_replicated: option.is_replicated,
757            instance_guard,
758            read_version,
759            event_sender,
760            memory_limiter,
761            hummock_version_reader,
762            stats,
763            write_limiter,
764            version_update_notifier_tx,
765            mem_table_spill_threshold,
766            upload_on_flush: option.upload_on_flush,
767        }
768    }
769
770    /// See `HummockReadVersion::update` for more details.
771    pub fn read_version(&self) -> HummockReadVersionRef {
772        self.read_version.clone()
773    }
774
775    pub fn table_id(&self) -> TableId {
776        self.instance_guard.table_id
777    }
778
779    pub fn instance_id(&self) -> u64 {
780        self.instance_guard.instance_id
781    }
782
783    fn is_flush_old_value(&self) -> bool {
784        matches!(
785            &self.op_consistency_level,
786            OpConsistencyLevel::ConsistentOldValue {
787                is_log_store: true,
788                ..
789            }
790        )
791    }
792}
793
794pub type StagingDataIterator = MergeIterator<
795    HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
796>;
797pub type StagingDataRevIterator = MergeIterator<
798    HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
799>;
800pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
801    HummockIteratorUnion<
802        Forward,
803        StagingDataIterator,
804        SstableIterator,
805        ConcatIteratorInner<SstableIterator>,
806        MemTableHummockIterator<'a>,
807    >,
808>;
809
810pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
811    HummockIteratorUnion<
812        Backward,
813        StagingDataRevIterator,
814        BackwardSstableIterator,
815        ConcatIteratorInner<BackwardSstableIterator>,
816        MemTableHummockRevIterator<'a>,
817    >,
818>;
819
820pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
821pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
822pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
823pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
824
825pub struct HummockStorageIteratorInner<'a> {
826    inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
827    initial_read: bool,
828    stats_guard: IterLocalMetricsGuard,
829}
830
831impl StateStoreIter for HummockStorageIteratorInner<'_> {
832    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
833        let iter = &mut self.inner;
834        if !self.initial_read {
835            self.initial_read = true;
836        } else {
837            iter.next().await?;
838        }
839
840        if iter.is_valid() {
841            Ok(Some((iter.key(), iter.value())))
842        } else {
843            Ok(None)
844        }
845    }
846}
847
848impl<'a> HummockStorageIteratorInner<'a> {
849    pub fn new(
850        inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
851        metrics: Arc<HummockStateStoreMetrics>,
852        table_id: TableId,
853        mut local_stats: StoreLocalStatistic,
854    ) -> Self {
855        local_stats.found_key = inner.is_valid();
856        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
857            + local_stats.staging_sst_iter_count
858            + local_stats.overlapping_iter_count
859            + local_stats.non_overlapping_iter_count;
860        Self {
861            inner,
862            initial_read: false,
863            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
864        }
865    }
866}
867
868impl Drop for HummockStorageIteratorInner<'_> {
869    fn drop(&mut self) {
870        self.inner
871            .collect_local_statistic(&mut self.stats_guard.local_stats);
872    }
873}
874
875#[derive(Default)]
876pub struct ForwardIteratorFactory {
877    non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
878    overlapping_iters: Vec<SstableIterator>,
879    staging_iters:
880        Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
881}
882
883impl ForwardIteratorFactory {
884    pub fn build(
885        self,
886        mem_table: Option<MemTableHummockIterator<'_>>,
887    ) -> HummockStorageIteratorPayloadInner<'_> {
888        // 3. build user_iterator
889        let staging_iter = StagingDataIterator::new(self.staging_iters);
890        MergeIterator::new(
891            once(HummockIteratorUnion::First(staging_iter))
892                .chain(
893                    self.overlapping_iters
894                        .into_iter()
895                        .map(HummockIteratorUnion::Second),
896                )
897                .chain(
898                    self.non_overlapping_iters
899                        .into_iter()
900                        .map(HummockIteratorUnion::Third),
901                )
902                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
903        )
904    }
905}
906
907pub struct HummockStorageRevIteratorInner<'a> {
908    inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
909    initial_read: bool,
910    stats_guard: IterLocalMetricsGuard,
911}
912
913impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
914    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
915        let iter = &mut self.inner;
916        if !self.initial_read {
917            self.initial_read = true;
918        } else {
919            iter.next().await?;
920        }
921
922        if iter.is_valid() {
923            Ok(Some((iter.key(), iter.value())))
924        } else {
925            Ok(None)
926        }
927    }
928}
929
930impl<'a> HummockStorageRevIteratorInner<'a> {
931    pub fn new(
932        inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
933        metrics: Arc<HummockStateStoreMetrics>,
934        table_id: TableId,
935        mut local_stats: StoreLocalStatistic,
936    ) -> Self {
937        local_stats.found_key = inner.is_valid();
938        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
939            + local_stats.staging_sst_iter_count
940            + local_stats.overlapping_iter_count
941            + local_stats.non_overlapping_iter_count;
942        Self {
943            inner,
944            initial_read: false,
945            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
946        }
947    }
948}
949
950impl Drop for HummockStorageRevIteratorInner<'_> {
951    fn drop(&mut self) {
952        self.inner
953            .collect_local_statistic(&mut self.stats_guard.local_stats);
954    }
955}
956
957impl IteratorFactory for ForwardIteratorFactory {
958    type Direction = Forward;
959    type SstableIteratorType = SstableIterator;
960
961    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
962        self.staging_iters
963            .push(HummockIteratorUnion::First(batch.into_forward_iter()));
964    }
965
966    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
967        self.staging_iters.push(HummockIteratorUnion::Second(iter));
968    }
969
970    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
971        self.overlapping_iters.push(iter);
972    }
973
974    fn add_concat_sst_iter(
975        &mut self,
976        tables: Vec<SstableInfo>,
977        sstable_store: SstableStoreRef,
978        read_options: Arc<SstableIteratorReadOptions>,
979    ) {
980        self.non_overlapping_iters
981            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
982                tables,
983                sstable_store,
984                read_options,
985            ));
986    }
987}
988
989#[derive(Default)]
990pub struct BackwardIteratorFactory {
991    non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
992    overlapping_iters: Vec<BackwardSstableIterator>,
993    staging_iters: Vec<
994        HummockIteratorUnion<
995            Backward,
996            SharedBufferBatchIterator<Backward>,
997            BackwardSstableIterator,
998        >,
999    >,
1000}
1001
1002impl BackwardIteratorFactory {
1003    pub fn build(
1004        self,
1005        mem_table: Option<MemTableHummockRevIterator<'_>>,
1006    ) -> StorageRevIteratorPayloadInner<'_> {
1007        // 3. build user_iterator
1008        let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1009        MergeIterator::new(
1010            once(HummockIteratorUnion::First(staging_iter))
1011                .chain(
1012                    self.overlapping_iters
1013                        .into_iter()
1014                        .map(HummockIteratorUnion::Second),
1015                )
1016                .chain(
1017                    self.non_overlapping_iters
1018                        .into_iter()
1019                        .map(HummockIteratorUnion::Third),
1020                )
1021                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1022        )
1023    }
1024}
1025
1026impl IteratorFactory for BackwardIteratorFactory {
1027    type Direction = Backward;
1028    type SstableIteratorType = BackwardSstableIterator;
1029
1030    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1031        self.staging_iters
1032            .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1033    }
1034
1035    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1036        self.staging_iters.push(HummockIteratorUnion::Second(iter));
1037    }
1038
1039    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1040        self.overlapping_iters.push(iter);
1041    }
1042
1043    fn add_concat_sst_iter(
1044        &mut self,
1045        mut tables: Vec<SstableInfo>,
1046        sstable_store: SstableStoreRef,
1047        read_options: Arc<SstableIteratorReadOptions>,
1048    ) {
1049        tables.reverse();
1050        self.non_overlapping_iters
1051            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1052                tables,
1053                sstable_store,
1054                read_options,
1055            ));
1056    }
1057}