risingwave_storage/hummock/store/
local_hummock_storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{TableKey, TableKeyRange, is_empty_key_range, vnode_range};
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
30use tracing::{Instrument, warn};
31
32use super::version::{StagingData, VersionUpdate};
33use crate::error::StorageResult;
34use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
35use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
36use crate::hummock::iterator::{
37    Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
38    IteratorFactory, MergeIterator, UserIterator,
39};
40use crate::hummock::local_version::pinned_version::PinnedVersion;
41use crate::hummock::shared_buffer::shared_buffer_batch::{
42    SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
43    SharedBufferValue,
44};
45use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
46use crate::hummock::utils::{
47    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
48    wait_for_epoch,
49};
50use crate::hummock::write_limiter::WriteLimiterRef;
51use crate::hummock::{
52    BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
53    SstableIteratorReadOptions, SstableStoreRef,
54};
55use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
56use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
57use crate::store::*;
58
59/// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to
60/// the hummock state backend. It is created via `HummockStorage::new_local`.
61pub struct LocalHummockStorage {
62    mem_table: MemTable,
63
64    spill_offset: u16,
65    epoch: Option<EpochPair>,
66
67    table_id: TableId,
68    op_consistency_level: OpConsistencyLevel,
69    table_option: TableOption,
70
71    instance_guard: LocalInstanceGuard,
72
73    /// Read handle.
74    read_version: HummockReadVersionRef,
75
76    /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
77    /// It's used by executors in different CNs to synchronize states.
78    ///
79    /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
80    /// persisted, so we won't have duplicate data.
81    ///
82    /// This also handles a corner case where an executor doing replication
83    /// is scheduled to the same CN as its Upstream executor.
84    /// In that case, we use this flag to avoid reading the same data twice,
85    /// by ignoring the replicated `ReadVersion`.
86    is_replicated: bool,
87
88    /// Event sender.
89    event_sender: HummockEventSender,
90
91    memory_limiter: Arc<MemoryLimiter>,
92
93    hummock_version_reader: HummockVersionReader,
94
95    stats: Arc<HummockStateStoreMetrics>,
96
97    write_limiter: WriteLimiterRef,
98
99    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
100
101    mem_table_spill_threshold: usize,
102}
103
104impl LocalHummockFlushedSnapshotReader {
105    async fn get_flushed(
106        hummock_version_reader: &HummockVersionReader,
107        read_version: &HummockReadVersionRef,
108        table_key: TableKey<Bytes>,
109        read_options: ReadOptions,
110    ) -> StorageResult<Option<StateStoreKeyedRow>> {
111        let table_key_range = (
112            Bound::Included(table_key.clone()),
113            Bound::Included(table_key.clone()),
114        );
115
116        let (table_key_range, read_snapshot) = read_filter_for_version(
117            MAX_EPOCH,
118            read_options.table_id,
119            table_key_range,
120            read_version,
121        )?;
122
123        if is_empty_key_range(&table_key_range) {
124            return Ok(None);
125        }
126
127        hummock_version_reader
128            .get(table_key, MAX_EPOCH, read_options, read_snapshot)
129            .await
130    }
131
132    async fn iter_flushed(
133        &self,
134        table_key_range: TableKeyRange,
135        read_options: ReadOptions,
136    ) -> StorageResult<HummockStorageIterator> {
137        let (table_key_range, read_snapshot) = read_filter_for_version(
138            MAX_EPOCH,
139            read_options.table_id,
140            table_key_range,
141            &self.read_version,
142        )?;
143
144        let table_key_range = table_key_range;
145
146        self.hummock_version_reader
147            .iter(table_key_range, MAX_EPOCH, read_options, read_snapshot)
148            .await
149    }
150
151    async fn rev_iter_flushed(
152        &self,
153        table_key_range: TableKeyRange,
154        read_options: ReadOptions,
155    ) -> StorageResult<HummockStorageRevIterator> {
156        let (table_key_range, read_snapshot) = read_filter_for_version(
157            MAX_EPOCH,
158            read_options.table_id,
159            table_key_range,
160            &self.read_version,
161        )?;
162
163        let table_key_range = table_key_range;
164
165        self.hummock_version_reader
166            .rev_iter(
167                table_key_range,
168                MAX_EPOCH,
169                read_options,
170                read_snapshot,
171                None,
172            )
173            .await
174    }
175}
176
177impl LocalHummockStorage {
178    fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
179        MemTableHummockIterator::new(
180            &self.mem_table.buffer,
181            EpochWithGap::new(self.epoch(), self.spill_offset),
182            self.table_id,
183        )
184    }
185
186    fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
187        MemTableHummockRevIterator::new(
188            &self.mem_table.buffer,
189            EpochWithGap::new(self.epoch(), self.spill_offset),
190            self.table_id,
191        )
192    }
193
194    async fn iter_all(
195        &self,
196        table_key_range: TableKeyRange,
197        epoch: u64,
198        read_options: ReadOptions,
199    ) -> StorageResult<LocalHummockStorageIterator<'_>> {
200        let (table_key_range, read_snapshot) = read_filter_for_version(
201            epoch,
202            read_options.table_id,
203            table_key_range,
204            &self.read_version,
205        )?;
206
207        self.hummock_version_reader
208            .iter_with_memtable(
209                table_key_range,
210                epoch,
211                read_options,
212                read_snapshot,
213                Some(self.mem_table_iter()),
214            )
215            .await
216    }
217
218    async fn rev_iter_all(
219        &self,
220        table_key_range: TableKeyRange,
221        epoch: u64,
222        read_options: ReadOptions,
223    ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
224        let (table_key_range, read_snapshot) = read_filter_for_version(
225            epoch,
226            read_options.table_id,
227            table_key_range,
228            &self.read_version,
229        )?;
230
231        self.hummock_version_reader
232            .rev_iter(
233                table_key_range,
234                epoch,
235                read_options,
236                read_snapshot,
237                Some(self.mem_table_rev_iter()),
238            )
239            .await
240    }
241}
242
243#[derive(Clone)]
244pub struct LocalHummockFlushedSnapshotReader {
245    table_id: TableId,
246    read_version: HummockReadVersionRef,
247    hummock_version_reader: HummockVersionReader,
248}
249
250impl StateStoreRead for LocalHummockFlushedSnapshotReader {
251    type Iter = HummockStorageIterator;
252    type RevIter = HummockStorageRevIterator;
253
254    fn get_keyed_row(
255        &self,
256        key: TableKey<Bytes>,
257        read_options: ReadOptions,
258    ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + Send + '_ {
259        assert_eq!(self.table_id, read_options.table_id);
260        Self::get_flushed(
261            &self.hummock_version_reader,
262            &self.read_version,
263            key,
264            read_options,
265        )
266    }
267
268    fn iter(
269        &self,
270        key_range: TableKeyRange,
271        read_options: ReadOptions,
272    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
273        self.iter_flushed(key_range, read_options)
274            .instrument(tracing::trace_span!("hummock_iter"))
275    }
276
277    fn rev_iter(
278        &self,
279        key_range: TableKeyRange,
280        read_options: ReadOptions,
281    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
282        self.rev_iter_flushed(key_range, read_options)
283            .instrument(tracing::trace_span!("hummock_rev_iter"))
284    }
285}
286
287impl LocalStateStore for LocalHummockStorage {
288    type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
289    type Iter<'a> = LocalHummockStorageIterator<'a>;
290    type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
291
292    async fn get(
293        &self,
294        key: TableKey<Bytes>,
295        read_options: ReadOptions,
296    ) -> StorageResult<Option<Bytes>> {
297        assert_eq!(self.table_id, read_options.table_id);
298        match self.mem_table.buffer.get(&key) {
299            None => LocalHummockFlushedSnapshotReader::get_flushed(
300                &self.hummock_version_reader,
301                &self.read_version,
302                key,
303                read_options,
304            )
305            .await
306            .map(|e| e.map(|item| item.1)),
307            Some(op) => match op {
308                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok(Some(value.clone())),
309                KeyOp::Delete(_) => Ok(None),
310            },
311        }
312    }
313
314    async fn iter(
315        &self,
316        key_range: TableKeyRange,
317        read_options: ReadOptions,
318    ) -> StorageResult<Self::Iter<'_>> {
319        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
320        assert_eq!(
321            r_vnode_exclusive - l_vnode_inclusive,
322            1,
323            "read range {:?} for table {} iter contains more than one vnode",
324            key_range,
325            read_options.table_id
326        );
327        self.iter_all(key_range.clone(), self.epoch(), read_options)
328            .await
329    }
330
331    async fn rev_iter(
332        &self,
333        key_range: TableKeyRange,
334        read_options: ReadOptions,
335    ) -> StorageResult<Self::RevIter<'_>> {
336        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
337        assert_eq!(
338            r_vnode_exclusive - l_vnode_inclusive,
339            1,
340            "read range {:?} for table {} iter contains more than one vnode",
341            key_range,
342            read_options.table_id
343        );
344        self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
345            .await
346    }
347
348    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
349        self.new_flushed_snapshot_reader_inner()
350    }
351
352    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
353        self.read_version.read().latest_watermark(vnode)
354    }
355
356    fn insert(
357        &mut self,
358        key: TableKey<Bytes>,
359        new_val: Bytes,
360        old_val: Option<Bytes>,
361    ) -> StorageResult<()> {
362        match old_val {
363            None => self.mem_table.insert(key, new_val)?,
364            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
365        };
366
367        Ok(())
368    }
369
370    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
371        self.mem_table.delete(key, old_val)?;
372
373        Ok(())
374    }
375
376    async fn flush(&mut self) -> StorageResult<usize> {
377        let buffer = self.mem_table.drain().into_parts();
378        let mut kv_pairs = Vec::with_capacity(buffer.len());
379        let mut old_values = if self.is_flush_old_value() {
380            Some(Vec::with_capacity(buffer.len()))
381        } else {
382            None
383        };
384        let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
385            Some(self.new_flushed_snapshot_reader_inner())
386        } else {
387            None
388        };
389        for (key, key_op) in buffer {
390            match key_op {
391                // Currently, some executors do not strictly comply with these semantics. As
392                // a workaround you may call disable the check by initializing the
393                // state store with `is_consistent_op=false`.
394                KeyOp::Insert(value) => {
395                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
396                        do_insert_sanity_check(
397                            &key,
398                            &value,
399                            sanity_check_reader,
400                            self.table_id,
401                            self.table_option,
402                            &self.op_consistency_level,
403                        )
404                        .await?;
405                    }
406                    kv_pairs.push((key, SharedBufferValue::Insert(value)));
407                    if let Some(old_values) = &mut old_values {
408                        old_values.push(Bytes::new());
409                    }
410                }
411                KeyOp::Delete(old_value) => {
412                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
413                        do_delete_sanity_check(
414                            &key,
415                            &old_value,
416                            sanity_check_reader,
417                            self.table_id,
418                            self.table_option,
419                            &self.op_consistency_level,
420                        )
421                        .await?;
422                    }
423                    kv_pairs.push((key, SharedBufferValue::Delete));
424                    if let Some(old_values) = &mut old_values {
425                        old_values.push(old_value);
426                    }
427                }
428                KeyOp::Update((old_value, new_value)) => {
429                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
430                        do_update_sanity_check(
431                            &key,
432                            &old_value,
433                            &new_value,
434                            sanity_check_reader,
435                            self.table_id,
436                            self.table_option,
437                            &self.op_consistency_level,
438                        )
439                        .await?;
440                    }
441                    kv_pairs.push((key, SharedBufferValue::Update(new_value)));
442                    if let Some(old_values) = &mut old_values {
443                        old_values.push(old_value);
444                    }
445                }
446            }
447        }
448        self.flush_inner(
449            kv_pairs,
450            old_values,
451            WriteOptions {
452                epoch: self.epoch(),
453                table_id: self.table_id,
454            },
455        )
456        .await
457    }
458
459    async fn try_flush(&mut self) -> StorageResult<()> {
460        if self.mem_table_spill_threshold != 0
461            && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
462        {
463            if self.spill_offset < MAX_SPILL_TIMES {
464                let table_id_label = self.table_id.table_id().to_string();
465                self.flush().await?;
466                self.stats
467                    .mem_table_spill_counts
468                    .with_label_values(&[table_id_label.as_str()])
469                    .inc();
470            } else {
471                tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
472            }
473        }
474
475        Ok(())
476    }
477
478    fn epoch(&self) -> u64 {
479        self.epoch.expect("should have set the epoch").curr
480    }
481
482    fn is_dirty(&self) -> bool {
483        self.mem_table.is_dirty()
484    }
485
486    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
487        let epoch = options.epoch;
488        wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
489        assert_eq!(
490            self.epoch.replace(epoch),
491            None,
492            "local state store of table id {:?} is init for more than once",
493            self.table_id
494        );
495        if !self.is_replicated {
496            self.event_sender
497                .send(HummockEvent::InitEpoch {
498                    instance_id: self.instance_id(),
499                    init_epoch: options.epoch.curr,
500                })
501                .map_err(|_| {
502                    HummockError::other("failed to send InitEpoch. maybe shutting down")
503                })?;
504        }
505        Ok(())
506    }
507
508    fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
509        assert!(!self.is_dirty());
510        if let Some(new_level) = &opts.switch_op_consistency_level {
511            self.mem_table.op_consistency_level.update(new_level);
512            self.op_consistency_level.update(new_level);
513        }
514        let epoch = self
515            .epoch
516            .as_mut()
517            .expect("should have init epoch before seal the first epoch");
518        let prev_epoch = epoch.curr;
519        epoch.prev = prev_epoch;
520        epoch.curr = next_epoch;
521        self.spill_offset = 0;
522        assert!(
523            next_epoch > prev_epoch,
524            "new epoch {} should be greater than current epoch: {}",
525            next_epoch,
526            prev_epoch
527        );
528
529        // only update the PkPrefix watermark for read
530        if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
531            &mut opts.table_watermarks
532        {
533            let mut read_version = self.read_version.write();
534            read_version.filter_regress_watermarks(watermarks);
535            if !watermarks.is_empty() {
536                read_version.update(VersionUpdate::NewTableWatermark {
537                    direction: *direction,
538                    epoch: prev_epoch,
539                    vnode_watermarks: watermarks.clone(),
540                    watermark_type: WatermarkSerdeType::PkPrefix,
541                });
542            }
543        }
544
545        if !self.is_replicated
546            && self
547                .event_sender
548                .send(HummockEvent::LocalSealEpoch {
549                    instance_id: self.instance_id(),
550                    next_epoch,
551                    opts,
552                })
553                .is_err()
554        {
555            warn!("failed to send LocalSealEpoch. maybe shutting down");
556        }
557    }
558
559    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
560        wait_for_epoch(
561            &self.version_update_notifier_tx,
562            self.epoch.expect("should have init").prev,
563            self.table_id,
564        )
565        .await?;
566        assert!(self.mem_table.buffer.is_empty());
567        let mut read_version = self.read_version.write();
568        assert!(
569            read_version.staging().is_empty(),
570            "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
571            self.table_id(),
572            self.instance_id()
573        );
574        Ok(read_version.update_vnode_bitmap(vnodes))
575    }
576}
577
578impl LocalHummockStorage {
579    fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
580        LocalHummockFlushedSnapshotReader {
581            table_id: self.table_id,
582            read_version: self.read_version.clone(),
583            hummock_version_reader: self.hummock_version_reader.clone(),
584        }
585    }
586
587    async fn flush_inner(
588        &mut self,
589        sorted_items: Vec<SharedBufferItem>,
590        old_values: Option<Vec<Bytes>>,
591        write_options: WriteOptions,
592    ) -> StorageResult<usize> {
593        let epoch = write_options.epoch;
594        let table_id = write_options.table_id;
595
596        let table_id_label = table_id.to_string();
597        self.stats
598            .write_batch_tuple_counts
599            .with_label_values(&[table_id_label.as_str()])
600            .inc_by(sorted_items.len() as _);
601        let timer = self
602            .stats
603            .write_batch_duration
604            .with_label_values(&[table_id_label.as_str()])
605            .start_timer();
606
607        let imm_size = if !sorted_items.is_empty() {
608            let (size, old_value_size) =
609                SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
610
611            self.write_limiter.wait_permission(self.table_id).await;
612            let limiter = self.memory_limiter.as_ref();
613            let tracker = match limiter.try_require_memory(size as u64) {
614                Some(tracker) => tracker,
615                _ => {
616                    warn!(
617                        "blocked at requiring memory: {}, current {}",
618                        size,
619                        limiter.get_memory_usage()
620                    );
621                    self.event_sender
622                        .send(HummockEvent::BufferMayFlush)
623                        .expect("should be able to send");
624                    let tracker = limiter
625                        .require_memory(size as u64)
626                        .instrument_await("hummock_require_memory".verbose())
627                        .await;
628                    warn!(
629                        "successfully requiring memory: {}, current {}",
630                        size,
631                        limiter.get_memory_usage()
632                    );
633                    tracker
634                }
635            };
636
637            let old_values = old_values.map(|old_values| {
638                SharedBufferBatchOldValues::new(
639                    old_values,
640                    old_value_size,
641                    self.stats.old_value_size.clone(),
642                )
643            });
644
645            let instance_id = self.instance_guard.instance_id;
646            let imm = SharedBufferBatch::build_shared_buffer_batch(
647                epoch,
648                self.spill_offset,
649                sorted_items,
650                old_values,
651                size,
652                table_id,
653                Some(tracker),
654            );
655            self.spill_offset += 1;
656            let imm_size = imm.size();
657            self.read_version
658                .write()
659                .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
660
661            // insert imm to uploader
662            if !self.is_replicated {
663                self.event_sender
664                    .send(HummockEvent::ImmToUploader { instance_id, imm })
665                    .map_err(|_| {
666                        HummockError::other("failed to send imm to uploader. maybe shutting down")
667                    })?;
668            }
669            imm_size
670        } else {
671            0
672        };
673
674        timer.observe_duration();
675
676        self.stats
677            .write_batch_size
678            .with_label_values(&[table_id_label.as_str()])
679            .observe(imm_size as _);
680        Ok(imm_size)
681    }
682}
683
684impl LocalHummockStorage {
685    #[allow(clippy::too_many_arguments)]
686    pub fn new(
687        instance_guard: LocalInstanceGuard,
688        read_version: HummockReadVersionRef,
689        hummock_version_reader: HummockVersionReader,
690        event_sender: HummockEventSender,
691        memory_limiter: Arc<MemoryLimiter>,
692        write_limiter: WriteLimiterRef,
693        option: NewLocalOptions,
694        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
695        mem_table_spill_threshold: usize,
696    ) -> Self {
697        let stats = hummock_version_reader.stats().clone();
698        Self {
699            mem_table: MemTable::new(option.op_consistency_level.clone()),
700            spill_offset: 0,
701            epoch: None,
702            table_id: option.table_id,
703            op_consistency_level: option.op_consistency_level,
704            table_option: option.table_option,
705            is_replicated: option.is_replicated,
706            instance_guard,
707            read_version,
708            event_sender,
709            memory_limiter,
710            hummock_version_reader,
711            stats,
712            write_limiter,
713            version_update_notifier_tx,
714            mem_table_spill_threshold,
715        }
716    }
717
718    /// See `HummockReadVersion::update` for more details.
719    pub fn read_version(&self) -> HummockReadVersionRef {
720        self.read_version.clone()
721    }
722
723    pub fn table_id(&self) -> TableId {
724        self.instance_guard.table_id
725    }
726
727    pub fn instance_id(&self) -> u64 {
728        self.instance_guard.instance_id
729    }
730
731    fn is_flush_old_value(&self) -> bool {
732        matches!(
733            &self.op_consistency_level,
734            OpConsistencyLevel::ConsistentOldValue {
735                is_log_store: true,
736                ..
737            }
738        )
739    }
740}
741
742pub type StagingDataIterator = MergeIterator<
743    HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
744>;
745pub type StagingDataRevIterator = MergeIterator<
746    HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
747>;
748pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
749    HummockIteratorUnion<
750        Forward,
751        StagingDataIterator,
752        SstableIterator,
753        ConcatIteratorInner<SstableIterator>,
754        MemTableHummockIterator<'a>,
755    >,
756>;
757
758pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
759    HummockIteratorUnion<
760        Backward,
761        StagingDataRevIterator,
762        BackwardSstableIterator,
763        ConcatIteratorInner<BackwardSstableIterator>,
764        MemTableHummockRevIterator<'a>,
765    >,
766>;
767
768pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
769pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
770pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
771pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
772
773pub struct HummockStorageIteratorInner<'a> {
774    inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
775    initial_read: bool,
776    stats_guard: IterLocalMetricsGuard,
777}
778
779impl StateStoreIter for HummockStorageIteratorInner<'_> {
780    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
781        let iter = &mut self.inner;
782        if !self.initial_read {
783            self.initial_read = true;
784        } else {
785            iter.next().await?;
786        }
787
788        if iter.is_valid() {
789            Ok(Some((iter.key(), iter.value())))
790        } else {
791            Ok(None)
792        }
793    }
794}
795
796impl<'a> HummockStorageIteratorInner<'a> {
797    pub fn new(
798        inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
799        metrics: Arc<HummockStateStoreMetrics>,
800        table_id: TableId,
801        mut local_stats: StoreLocalStatistic,
802    ) -> Self {
803        local_stats.found_key = inner.is_valid();
804        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
805            + local_stats.staging_sst_iter_count
806            + local_stats.overlapping_iter_count
807            + local_stats.non_overlapping_iter_count;
808        Self {
809            inner,
810            initial_read: false,
811            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
812        }
813    }
814}
815
816impl Drop for HummockStorageIteratorInner<'_> {
817    fn drop(&mut self) {
818        self.inner
819            .collect_local_statistic(&mut self.stats_guard.local_stats);
820    }
821}
822
823#[derive(Default)]
824pub struct ForwardIteratorFactory {
825    non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
826    overlapping_iters: Vec<SstableIterator>,
827    staging_iters:
828        Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
829}
830
831impl ForwardIteratorFactory {
832    pub fn build(
833        self,
834        mem_table: Option<MemTableHummockIterator<'_>>,
835    ) -> HummockStorageIteratorPayloadInner<'_> {
836        // 3. build user_iterator
837        let staging_iter = StagingDataIterator::new(self.staging_iters);
838        MergeIterator::new(
839            once(HummockIteratorUnion::First(staging_iter))
840                .chain(
841                    self.overlapping_iters
842                        .into_iter()
843                        .map(HummockIteratorUnion::Second),
844                )
845                .chain(
846                    self.non_overlapping_iters
847                        .into_iter()
848                        .map(HummockIteratorUnion::Third),
849                )
850                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
851        )
852    }
853}
854
855pub struct HummockStorageRevIteratorInner<'a> {
856    inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
857    initial_read: bool,
858    stats_guard: IterLocalMetricsGuard,
859}
860
861impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
862    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
863        let iter = &mut self.inner;
864        if !self.initial_read {
865            self.initial_read = true;
866        } else {
867            iter.next().await?;
868        }
869
870        if iter.is_valid() {
871            Ok(Some((iter.key(), iter.value())))
872        } else {
873            Ok(None)
874        }
875    }
876}
877
878impl<'a> HummockStorageRevIteratorInner<'a> {
879    pub fn new(
880        inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
881        metrics: Arc<HummockStateStoreMetrics>,
882        table_id: TableId,
883        mut local_stats: StoreLocalStatistic,
884    ) -> Self {
885        local_stats.found_key = inner.is_valid();
886        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
887            + local_stats.staging_sst_iter_count
888            + local_stats.overlapping_iter_count
889            + local_stats.non_overlapping_iter_count;
890        Self {
891            inner,
892            initial_read: false,
893            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
894        }
895    }
896}
897
898impl Drop for HummockStorageRevIteratorInner<'_> {
899    fn drop(&mut self) {
900        self.inner
901            .collect_local_statistic(&mut self.stats_guard.local_stats);
902    }
903}
904
905impl IteratorFactory for ForwardIteratorFactory {
906    type Direction = Forward;
907    type SstableIteratorType = SstableIterator;
908
909    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
910        self.staging_iters
911            .push(HummockIteratorUnion::First(batch.into_forward_iter()));
912    }
913
914    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
915        self.staging_iters.push(HummockIteratorUnion::Second(iter));
916    }
917
918    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
919        self.overlapping_iters.push(iter);
920    }
921
922    fn add_concat_sst_iter(
923        &mut self,
924        tables: Vec<SstableInfo>,
925        sstable_store: SstableStoreRef,
926        read_options: Arc<SstableIteratorReadOptions>,
927    ) {
928        self.non_overlapping_iters
929            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
930                tables,
931                sstable_store,
932                read_options,
933            ));
934    }
935}
936
937#[derive(Default)]
938pub struct BackwardIteratorFactory {
939    non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
940    overlapping_iters: Vec<BackwardSstableIterator>,
941    staging_iters: Vec<
942        HummockIteratorUnion<
943            Backward,
944            SharedBufferBatchIterator<Backward>,
945            BackwardSstableIterator,
946        >,
947    >,
948}
949
950impl BackwardIteratorFactory {
951    pub fn build(
952        self,
953        mem_table: Option<MemTableHummockRevIterator<'_>>,
954    ) -> StorageRevIteratorPayloadInner<'_> {
955        // 3. build user_iterator
956        let staging_iter = StagingDataRevIterator::new(self.staging_iters);
957        MergeIterator::new(
958            once(HummockIteratorUnion::First(staging_iter))
959                .chain(
960                    self.overlapping_iters
961                        .into_iter()
962                        .map(HummockIteratorUnion::Second),
963                )
964                .chain(
965                    self.non_overlapping_iters
966                        .into_iter()
967                        .map(HummockIteratorUnion::Third),
968                )
969                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
970        )
971    }
972}
973
974impl IteratorFactory for BackwardIteratorFactory {
975    type Direction = Backward;
976    type SstableIteratorType = BackwardSstableIterator;
977
978    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
979        self.staging_iters
980            .push(HummockIteratorUnion::First(batch.into_backward_iter()));
981    }
982
983    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
984        self.staging_iters.push(HummockIteratorUnion::Second(iter));
985    }
986
987    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
988        self.overlapping_iters.push(iter);
989    }
990
991    fn add_concat_sst_iter(
992        &mut self,
993        mut tables: Vec<SstableInfo>,
994        sstable_store: SstableStoreRef,
995        read_options: Arc<SstableIteratorReadOptions>,
996    ) {
997        tables.reverse();
998        self.non_overlapping_iters
999            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1000                tables,
1001                sstable_store,
1002                read_options,
1003            ));
1004    }
1005}