risingwave_storage/hummock/store/
local_hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28    FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::VersionUpdate;
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39    Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40    IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44    SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45    SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50    wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54    BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55    SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61/// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to
62/// the hummock state backend. It is created via `HummockStorage::new_local`.
63pub struct LocalHummockStorage {
64    mem_table: MemTable,
65
66    spill_offset: u16,
67    epoch: Option<EpochPair>,
68
69    table_id: TableId,
70    op_consistency_level: OpConsistencyLevel,
71    table_option: TableOption,
72
73    instance_guard: LocalInstanceGuard,
74
75    /// Read handle.
76    read_version: HummockReadVersionRef,
77
78    /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
79    /// It's used by executors in different CNs to synchronize states.
80    ///
81    /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
82    /// persisted, so we won't have duplicate data.
83    ///
84    /// This also handles a corner case where an executor doing replication
85    /// is scheduled to the same CN as its Upstream executor.
86    /// In that case, we use this flag to avoid reading the same data twice,
87    /// by ignoring the replicated `ReadVersion`.
88    is_replicated: bool,
89
90    /// Whether or not send imm to uploader on every flush.
91    upload_on_flush: bool,
92
93    /// Event sender.
94    event_sender: HummockEventSender,
95
96    memory_limiter: Arc<MemoryLimiter>,
97
98    hummock_version_reader: HummockVersionReader,
99
100    stats: Arc<HummockStateStoreMetrics>,
101
102    write_limiter: WriteLimiterRef,
103
104    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106    mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110    async fn get_flushed<O>(
111        hummock_version_reader: &HummockVersionReader,
112        read_version: &HummockReadVersionRef,
113        user_key: UserKey<Bytes>,
114        read_options: ReadOptions,
115        on_key_value_fn: impl crate::store::KeyValueFn<O>,
116    ) -> StorageResult<Option<O>> {
117        let table_key_range = (
118            Bound::Included(user_key.table_key.clone()),
119            Bound::Included(user_key.table_key.clone()),
120        );
121
122        let (table_key_range, read_snapshot) =
123            read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
124
125        if is_empty_key_range(&table_key_range) {
126            return Ok(None);
127        }
128
129        hummock_version_reader
130            .get(
131                user_key.table_key,
132                MAX_EPOCH,
133                user_key.table_id,
134                read_options,
135                read_snapshot,
136                on_key_value_fn,
137            )
138            .await
139    }
140
141    async fn iter_flushed(
142        &self,
143        table_key_range: TableKeyRange,
144        read_options: ReadOptions,
145    ) -> StorageResult<HummockStorageIterator> {
146        let (table_key_range, read_snapshot) = read_filter_for_version(
147            MAX_EPOCH,
148            self.table_id,
149            table_key_range,
150            &self.read_version,
151        )?;
152
153        let table_key_range = table_key_range;
154
155        self.hummock_version_reader
156            .iter(
157                table_key_range,
158                MAX_EPOCH,
159                self.table_id,
160                read_options,
161                read_snapshot,
162            )
163            .await
164    }
165
166    async fn rev_iter_flushed(
167        &self,
168        table_key_range: TableKeyRange,
169        read_options: ReadOptions,
170    ) -> StorageResult<HummockStorageRevIterator> {
171        let (table_key_range, read_snapshot) = read_filter_for_version(
172            MAX_EPOCH,
173            self.table_id,
174            table_key_range,
175            &self.read_version,
176        )?;
177
178        let table_key_range = table_key_range;
179
180        self.hummock_version_reader
181            .rev_iter(
182                table_key_range,
183                MAX_EPOCH,
184                self.table_id,
185                read_options,
186                read_snapshot,
187                None,
188            )
189            .await
190    }
191}
192
193impl LocalHummockStorage {
194    fn epoch(&self) -> u64 {
195        self.epoch.expect("should have set the epoch").curr
196    }
197
198    fn current_epoch_with_gap(&self) -> EpochWithGap {
199        EpochWithGap::new(self.epoch(), self.spill_offset)
200    }
201
202    fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
203        MemTableHummockIterator::new(
204            &self.mem_table.buffer,
205            self.current_epoch_with_gap(),
206            self.table_id,
207        )
208    }
209
210    fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
211        MemTableHummockRevIterator::new(
212            &self.mem_table.buffer,
213            self.current_epoch_with_gap(),
214            self.table_id,
215        )
216    }
217
218    async fn iter_all(
219        &self,
220        table_key_range: TableKeyRange,
221        epoch: u64,
222        read_options: ReadOptions,
223    ) -> StorageResult<LocalHummockStorageIterator<'_>> {
224        let (table_key_range, read_snapshot) =
225            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
226
227        self.hummock_version_reader
228            .iter_with_memtable(
229                table_key_range,
230                epoch,
231                self.table_id,
232                read_options,
233                read_snapshot,
234                Some(self.mem_table_iter()),
235            )
236            .await
237    }
238
239    async fn rev_iter_all(
240        &self,
241        table_key_range: TableKeyRange,
242        epoch: u64,
243        read_options: ReadOptions,
244    ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245        let (table_key_range, read_snapshot) =
246            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248        self.hummock_version_reader
249            .rev_iter(
250                table_key_range,
251                epoch,
252                self.table_id,
253                read_options,
254                read_snapshot,
255                Some(self.mem_table_rev_iter()),
256            )
257            .await
258    }
259}
260
261#[derive(Clone)]
262pub struct LocalHummockFlushedSnapshotReader {
263    table_id: TableId,
264    read_version: HummockReadVersionRef,
265    hummock_version_reader: HummockVersionReader,
266}
267
268impl StateStoreGet for LocalHummockFlushedSnapshotReader {
269    async fn on_key_value<O: Send + 'static>(
270        &self,
271        key: TableKey<Bytes>,
272        read_options: ReadOptions,
273        on_key_value_fn: impl KeyValueFn<O>,
274    ) -> StorageResult<Option<O>> {
275        let key = UserKey::new(self.table_id, key);
276        Self::get_flushed(
277            &self.hummock_version_reader,
278            &self.read_version,
279            key,
280            read_options,
281            on_key_value_fn,
282        )
283        .await
284    }
285}
286
287impl StateStoreRead for LocalHummockFlushedSnapshotReader {
288    type Iter = HummockStorageIterator;
289    type RevIter = HummockStorageRevIterator;
290
291    fn iter(
292        &self,
293        key_range: TableKeyRange,
294        read_options: ReadOptions,
295    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
296        self.iter_flushed(key_range, read_options)
297            .instrument(tracing::trace_span!("hummock_iter"))
298    }
299
300    fn rev_iter(
301        &self,
302        key_range: TableKeyRange,
303        read_options: ReadOptions,
304    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
305        self.rev_iter_flushed(key_range, read_options)
306            .instrument(tracing::trace_span!("hummock_rev_iter"))
307    }
308}
309
310impl StateStoreGet for LocalHummockStorage {
311    async fn on_key_value<O: Send + 'static>(
312        &self,
313        key: TableKey<Bytes>,
314        read_options: ReadOptions,
315        on_key_value_fn: impl KeyValueFn<O>,
316    ) -> StorageResult<Option<O>> {
317        let key = UserKey::new(self.table_id, key);
318        match self.mem_table.buffer.get(&key.table_key) {
319            None => {
320                LocalHummockFlushedSnapshotReader::get_flushed(
321                    &self.hummock_version_reader,
322                    &self.read_version,
323                    key,
324                    read_options,
325                    on_key_value_fn,
326                )
327                .await
328            }
329            Some(op) => match op {
330                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
331                    Some(on_key_value_fn(
332                        FullKey::new_with_gap_epoch(
333                            self.table_id,
334                            key.table_key.to_ref(),
335                            self.current_epoch_with_gap(),
336                        ),
337                        value.as_ref(),
338                    )?)
339                }),
340                KeyOp::Delete(_) => Ok(None),
341            },
342        }
343    }
344}
345
346impl LocalStateStore for LocalHummockStorage {
347    type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
348    type Iter<'a> = LocalHummockStorageIterator<'a>;
349    type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
350
351    async fn iter(
352        &self,
353        key_range: TableKeyRange,
354        read_options: ReadOptions,
355    ) -> StorageResult<Self::Iter<'_>> {
356        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
357        assert_eq!(
358            r_vnode_exclusive - l_vnode_inclusive,
359            1,
360            "read range {:?} for table {} iter contains more than one vnode",
361            key_range,
362            self.table_id
363        );
364        self.iter_all(key_range.clone(), self.epoch(), read_options)
365            .await
366    }
367
368    async fn rev_iter(
369        &self,
370        key_range: TableKeyRange,
371        read_options: ReadOptions,
372    ) -> StorageResult<Self::RevIter<'_>> {
373        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
374        assert_eq!(
375            r_vnode_exclusive - l_vnode_inclusive,
376            1,
377            "read range {:?} for table {} iter contains more than one vnode",
378            key_range,
379            self.table_id
380        );
381        self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
382            .await
383    }
384
385    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
386        self.new_flushed_snapshot_reader_inner()
387    }
388
389    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
390        self.read_version.read().latest_watermark(vnode)
391    }
392
393    fn insert(
394        &mut self,
395        key: TableKey<Bytes>,
396        new_val: Bytes,
397        old_val: Option<Bytes>,
398    ) -> StorageResult<()> {
399        match old_val {
400            None => self.mem_table.insert(key, new_val)?,
401            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
402        };
403
404        Ok(())
405    }
406
407    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
408        self.mem_table.delete(key, old_val)?;
409
410        Ok(())
411    }
412
413    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
414        self.update_vnode_bitmap_impl(vnodes).await
415    }
416}
417
418impl StateStoreWriteEpochControl for LocalHummockStorage {
419    async fn flush(&mut self) -> StorageResult<usize> {
420        let buffer = self.mem_table.drain().into_parts();
421        let mut kv_pairs = Vec::with_capacity(buffer.len());
422        let mut old_values = if self.is_flush_old_value() {
423            Some(Vec::with_capacity(buffer.len()))
424        } else {
425            None
426        };
427        let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
428            Some(self.new_flushed_snapshot_reader_inner())
429        } else {
430            None
431        };
432        for (key, key_op) in buffer {
433            match key_op {
434                // Currently, some executors do not strictly comply with these semantics. As
435                // a workaround you may call disable the check by initializing the
436                // state store with `is_consistent_op=false`.
437                KeyOp::Insert(value) => {
438                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
439                        do_insert_sanity_check(
440                            &key,
441                            &value,
442                            sanity_check_reader,
443                            self.table_option,
444                            &self.op_consistency_level,
445                        )
446                        .await?;
447                    }
448                    kv_pairs.push((key, SharedBufferValue::Insert(value)));
449                    if let Some(old_values) = &mut old_values {
450                        old_values.push(Bytes::new());
451                    }
452                }
453                KeyOp::Delete(old_value) => {
454                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
455                        do_delete_sanity_check(
456                            &key,
457                            &old_value,
458                            sanity_check_reader,
459                            self.table_option,
460                            &self.op_consistency_level,
461                        )
462                        .await?;
463                    }
464                    kv_pairs.push((key, SharedBufferValue::Delete));
465                    if let Some(old_values) = &mut old_values {
466                        old_values.push(old_value);
467                    }
468                }
469                KeyOp::Update((old_value, new_value)) => {
470                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
471                        do_update_sanity_check(
472                            &key,
473                            &old_value,
474                            &new_value,
475                            sanity_check_reader,
476                            self.table_option,
477                            &self.op_consistency_level,
478                        )
479                        .await?;
480                    }
481                    kv_pairs.push((key, SharedBufferValue::Update(new_value)));
482                    if let Some(old_values) = &mut old_values {
483                        old_values.push(old_value);
484                    }
485                }
486            }
487        }
488        self.flush_inner(kv_pairs, old_values).await
489    }
490
491    async fn try_flush(&mut self) -> StorageResult<()> {
492        if self.mem_table_spill_threshold != 0
493            && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
494        {
495            if self.spill_offset < MAX_SPILL_TIMES {
496                let table_id_label = self.table_id.table_id().to_string();
497                self.flush().await?;
498                self.stats
499                    .mem_table_spill_counts
500                    .with_label_values(&[table_id_label.as_str()])
501                    .inc();
502            } else {
503                tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
504            }
505        }
506
507        Ok(())
508    }
509
510    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
511        let epoch = options.epoch;
512        wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
513        assert_eq!(
514            self.epoch.replace(epoch),
515            None,
516            "local state store of table id {:?} is init for more than once",
517            self.table_id
518        );
519        if !self.is_replicated {
520            self.event_sender
521                .send(HummockEvent::InitEpoch {
522                    instance_id: self.instance_id(),
523                    init_epoch: options.epoch.curr,
524                })
525                .map_err(|_| {
526                    HummockError::other("failed to send InitEpoch. maybe shutting down")
527                })?;
528        }
529        Ok(())
530    }
531
532    fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
533        assert!(!self.mem_table.is_dirty());
534        if !self.is_replicated {
535            if self.upload_on_flush {
536                debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
537            } else {
538                let pending_imms = self.read_version.write().start_upload_pending_imms();
539                if !pending_imms.is_empty()
540                    && self
541                        .event_sender
542                        .send(HummockEvent::ImmToUploader {
543                            instance_id: self.instance_id(),
544                            imms: pending_imms,
545                        })
546                        .is_err()
547                {
548                    warn!("failed to send ImmToUploader during seal. maybe shutting down");
549                }
550            }
551        }
552
553        if let Some(new_level) = &opts.switch_op_consistency_level {
554            self.mem_table.op_consistency_level.update(new_level);
555            self.op_consistency_level.update(new_level);
556        }
557        let epoch = self
558            .epoch
559            .as_mut()
560            .expect("should have init epoch before seal the first epoch");
561        let prev_epoch = epoch.curr;
562        epoch.prev = prev_epoch;
563        epoch.curr = next_epoch;
564        self.spill_offset = 0;
565        assert!(
566            next_epoch > prev_epoch,
567            "new epoch {} should be greater than current epoch: {}",
568            next_epoch,
569            prev_epoch
570        );
571
572        // only update the PkPrefix watermark for read
573        if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
574            &mut opts.table_watermarks
575        {
576            let mut read_version = self.read_version.write();
577            read_version.filter_regress_watermarks(watermarks);
578            if !watermarks.is_empty() {
579                read_version.update(VersionUpdate::NewTableWatermark {
580                    direction: *direction,
581                    epoch: prev_epoch,
582                    vnode_watermarks: watermarks.clone(),
583                    watermark_type: WatermarkSerdeType::PkPrefix,
584                });
585            }
586        }
587
588        if !self.is_replicated
589            && self
590                .event_sender
591                .send(HummockEvent::LocalSealEpoch {
592                    instance_id: self.instance_id(),
593                    next_epoch,
594                    opts,
595                })
596                .is_err()
597        {
598            warn!("failed to send LocalSealEpoch. maybe shutting down");
599        }
600    }
601}
602
603impl LocalHummockStorage {
604    async fn update_vnode_bitmap_impl(
605        &mut self,
606        vnodes: Arc<Bitmap>,
607    ) -> StorageResult<Arc<Bitmap>> {
608        wait_for_epoch(
609            &self.version_update_notifier_tx,
610            self.epoch.expect("should have init").prev,
611            self.table_id,
612        )
613        .await?;
614        assert!(!self.mem_table.is_dirty());
615        let mut read_version = self.read_version.write();
616        assert!(
617            read_version.staging().is_empty(),
618            "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
619            self.table_id(),
620            self.instance_id()
621        );
622        Ok(read_version.update_vnode_bitmap(vnodes))
623    }
624
625    fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
626        LocalHummockFlushedSnapshotReader {
627            table_id: self.table_id,
628            read_version: self.read_version.clone(),
629            hummock_version_reader: self.hummock_version_reader.clone(),
630        }
631    }
632
633    async fn flush_inner(
634        &mut self,
635        sorted_items: Vec<SharedBufferItem>,
636        old_values: Option<Vec<Bytes>>,
637    ) -> StorageResult<usize> {
638        let epoch = self.epoch();
639        let table_id = self.table_id;
640
641        let table_id_label = table_id.to_string();
642        self.stats
643            .write_batch_tuple_counts
644            .with_label_values(&[table_id_label.as_str()])
645            .inc_by(sorted_items.len() as _);
646        let timer = self
647            .stats
648            .write_batch_duration
649            .with_label_values(&[table_id_label.as_str()])
650            .start_timer();
651
652        let imm_size = if !sorted_items.is_empty() {
653            let (size, old_value_size) =
654                SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
655
656            self.write_limiter.wait_permission(self.table_id).await;
657            let limiter = self.memory_limiter.as_ref();
658            let tracker = match limiter.try_require_memory(size as u64) {
659                Some(tracker) => tracker,
660                _ => {
661                    warn!(
662                        "blocked at requiring memory: {}, current {}",
663                        size,
664                        limiter.get_memory_usage()
665                    );
666                    self.event_sender
667                        .send(HummockEvent::BufferMayFlush)
668                        .expect("should be able to send");
669                    let tracker = limiter
670                        .require_memory(size as u64)
671                        .instrument_await("hummock_require_memory".verbose())
672                        .await;
673                    warn!(
674                        "successfully requiring memory: {}, current {}",
675                        size,
676                        limiter.get_memory_usage()
677                    );
678                    tracker
679                }
680            };
681
682            let old_values = old_values.map(|old_values| {
683                SharedBufferBatchOldValues::new(
684                    old_values,
685                    old_value_size,
686                    self.stats.old_value_size.clone(),
687                )
688            });
689
690            let instance_id = self.instance_guard.instance_id;
691            let imm = SharedBufferBatch::build_shared_buffer_batch(
692                epoch,
693                self.spill_offset,
694                sorted_items,
695                old_values,
696                size,
697                table_id,
698                Some(tracker),
699            );
700            self.spill_offset += 1;
701            let imm_size = imm.size();
702            let mut read_version = self.read_version.write();
703            read_version.add_imm(imm);
704
705            // insert imm to uploader
706            if !self.is_replicated
707                && (self.upload_on_flush
708                    || read_version.pending_imm_size() >= self.mem_table_spill_threshold)
709            {
710                let imms = read_version.start_upload_pending_imms();
711                self.event_sender
712                    .send(HummockEvent::ImmToUploader { instance_id, imms })
713                    .map_err(|_| {
714                        HummockError::other("failed to send imm to uploader. maybe shutting down")
715                    })?;
716            }
717            imm_size
718        } else {
719            0
720        };
721
722        timer.observe_duration();
723
724        self.stats
725            .write_batch_size
726            .with_label_values(&[table_id_label.as_str()])
727            .observe(imm_size as _);
728        Ok(imm_size)
729    }
730}
731
732impl LocalHummockStorage {
733    #[allow(clippy::too_many_arguments)]
734    pub fn new(
735        instance_guard: LocalInstanceGuard,
736        read_version: HummockReadVersionRef,
737        hummock_version_reader: HummockVersionReader,
738        event_sender: HummockEventSender,
739        memory_limiter: Arc<MemoryLimiter>,
740        write_limiter: WriteLimiterRef,
741        option: NewLocalOptions,
742        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
743        mem_table_spill_threshold: usize,
744    ) -> Self {
745        let stats = hummock_version_reader.stats().clone();
746        Self {
747            mem_table: MemTable::new(option.op_consistency_level.clone()),
748            spill_offset: 0,
749            epoch: None,
750            table_id: option.table_id,
751            op_consistency_level: option.op_consistency_level,
752            table_option: option.table_option,
753            is_replicated: option.is_replicated,
754            instance_guard,
755            read_version,
756            event_sender,
757            memory_limiter,
758            hummock_version_reader,
759            stats,
760            write_limiter,
761            version_update_notifier_tx,
762            mem_table_spill_threshold,
763            upload_on_flush: option.upload_on_flush,
764        }
765    }
766
767    /// See `HummockReadVersion::update` for more details.
768    pub fn read_version(&self) -> HummockReadVersionRef {
769        self.read_version.clone()
770    }
771
772    pub fn table_id(&self) -> TableId {
773        self.instance_guard.table_id
774    }
775
776    pub fn instance_id(&self) -> u64 {
777        self.instance_guard.instance_id
778    }
779
780    fn is_flush_old_value(&self) -> bool {
781        matches!(
782            &self.op_consistency_level,
783            OpConsistencyLevel::ConsistentOldValue {
784                is_log_store: true,
785                ..
786            }
787        )
788    }
789}
790
791pub type StagingDataIterator = MergeIterator<
792    HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
793>;
794pub type StagingDataRevIterator = MergeIterator<
795    HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
796>;
797pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
798    HummockIteratorUnion<
799        Forward,
800        StagingDataIterator,
801        SstableIterator,
802        ConcatIteratorInner<SstableIterator>,
803        MemTableHummockIterator<'a>,
804    >,
805>;
806
807pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
808    HummockIteratorUnion<
809        Backward,
810        StagingDataRevIterator,
811        BackwardSstableIterator,
812        ConcatIteratorInner<BackwardSstableIterator>,
813        MemTableHummockRevIterator<'a>,
814    >,
815>;
816
817pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
818pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
819pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
820pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
821
822pub struct HummockStorageIteratorInner<'a> {
823    inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
824    initial_read: bool,
825    stats_guard: IterLocalMetricsGuard,
826}
827
828impl StateStoreIter for HummockStorageIteratorInner<'_> {
829    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
830        let iter = &mut self.inner;
831        if !self.initial_read {
832            self.initial_read = true;
833        } else {
834            iter.next().await?;
835        }
836
837        if iter.is_valid() {
838            Ok(Some((iter.key(), iter.value())))
839        } else {
840            Ok(None)
841        }
842    }
843}
844
845impl<'a> HummockStorageIteratorInner<'a> {
846    pub fn new(
847        inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
848        metrics: Arc<HummockStateStoreMetrics>,
849        table_id: TableId,
850        mut local_stats: StoreLocalStatistic,
851    ) -> Self {
852        local_stats.found_key = inner.is_valid();
853        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
854            + local_stats.staging_sst_iter_count
855            + local_stats.overlapping_iter_count
856            + local_stats.non_overlapping_iter_count;
857        Self {
858            inner,
859            initial_read: false,
860            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
861        }
862    }
863}
864
865impl Drop for HummockStorageIteratorInner<'_> {
866    fn drop(&mut self) {
867        self.inner
868            .collect_local_statistic(&mut self.stats_guard.local_stats);
869    }
870}
871
872#[derive(Default)]
873pub struct ForwardIteratorFactory {
874    non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
875    overlapping_iters: Vec<SstableIterator>,
876    staging_iters:
877        Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
878}
879
880impl ForwardIteratorFactory {
881    pub fn build(
882        self,
883        mem_table: Option<MemTableHummockIterator<'_>>,
884    ) -> HummockStorageIteratorPayloadInner<'_> {
885        // 3. build user_iterator
886        let staging_iter = StagingDataIterator::new(self.staging_iters);
887        MergeIterator::new(
888            once(HummockIteratorUnion::First(staging_iter))
889                .chain(
890                    self.overlapping_iters
891                        .into_iter()
892                        .map(HummockIteratorUnion::Second),
893                )
894                .chain(
895                    self.non_overlapping_iters
896                        .into_iter()
897                        .map(HummockIteratorUnion::Third),
898                )
899                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
900        )
901    }
902}
903
904pub struct HummockStorageRevIteratorInner<'a> {
905    inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
906    initial_read: bool,
907    stats_guard: IterLocalMetricsGuard,
908}
909
910impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
911    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
912        let iter = &mut self.inner;
913        if !self.initial_read {
914            self.initial_read = true;
915        } else {
916            iter.next().await?;
917        }
918
919        if iter.is_valid() {
920            Ok(Some((iter.key(), iter.value())))
921        } else {
922            Ok(None)
923        }
924    }
925}
926
927impl<'a> HummockStorageRevIteratorInner<'a> {
928    pub fn new(
929        inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
930        metrics: Arc<HummockStateStoreMetrics>,
931        table_id: TableId,
932        mut local_stats: StoreLocalStatistic,
933    ) -> Self {
934        local_stats.found_key = inner.is_valid();
935        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
936            + local_stats.staging_sst_iter_count
937            + local_stats.overlapping_iter_count
938            + local_stats.non_overlapping_iter_count;
939        Self {
940            inner,
941            initial_read: false,
942            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
943        }
944    }
945}
946
947impl Drop for HummockStorageRevIteratorInner<'_> {
948    fn drop(&mut self) {
949        self.inner
950            .collect_local_statistic(&mut self.stats_guard.local_stats);
951    }
952}
953
954impl IteratorFactory for ForwardIteratorFactory {
955    type Direction = Forward;
956    type SstableIteratorType = SstableIterator;
957
958    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
959        self.staging_iters
960            .push(HummockIteratorUnion::First(batch.into_forward_iter()));
961    }
962
963    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
964        self.staging_iters.push(HummockIteratorUnion::Second(iter));
965    }
966
967    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
968        self.overlapping_iters.push(iter);
969    }
970
971    fn add_concat_sst_iter(
972        &mut self,
973        tables: Vec<SstableInfo>,
974        sstable_store: SstableStoreRef,
975        read_options: Arc<SstableIteratorReadOptions>,
976    ) {
977        self.non_overlapping_iters
978            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
979                tables,
980                sstable_store,
981                read_options,
982            ));
983    }
984}
985
986#[derive(Default)]
987pub struct BackwardIteratorFactory {
988    non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
989    overlapping_iters: Vec<BackwardSstableIterator>,
990    staging_iters: Vec<
991        HummockIteratorUnion<
992            Backward,
993            SharedBufferBatchIterator<Backward>,
994            BackwardSstableIterator,
995        >,
996    >,
997}
998
999impl BackwardIteratorFactory {
1000    pub fn build(
1001        self,
1002        mem_table: Option<MemTableHummockRevIterator<'_>>,
1003    ) -> StorageRevIteratorPayloadInner<'_> {
1004        // 3. build user_iterator
1005        let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1006        MergeIterator::new(
1007            once(HummockIteratorUnion::First(staging_iter))
1008                .chain(
1009                    self.overlapping_iters
1010                        .into_iter()
1011                        .map(HummockIteratorUnion::Second),
1012                )
1013                .chain(
1014                    self.non_overlapping_iters
1015                        .into_iter()
1016                        .map(HummockIteratorUnion::Third),
1017                )
1018                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1019        )
1020    }
1021}
1022
1023impl IteratorFactory for BackwardIteratorFactory {
1024    type Direction = Backward;
1025    type SstableIteratorType = BackwardSstableIterator;
1026
1027    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1028        self.staging_iters
1029            .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1030    }
1031
1032    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1033        self.staging_iters.push(HummockIteratorUnion::Second(iter));
1034    }
1035
1036    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1037        self.overlapping_iters.push(iter);
1038    }
1039
1040    fn add_concat_sst_iter(
1041        &mut self,
1042        mut tables: Vec<SstableInfo>,
1043        sstable_store: SstableStoreRef,
1044        read_options: Arc<SstableIteratorReadOptions>,
1045    ) {
1046        tables.reverse();
1047        self.non_overlapping_iters
1048            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1049                tables,
1050                sstable_store,
1051                read_options,
1052            ));
1053    }
1054}