risingwave_storage/hummock/store/
local_hummock_storage.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::key::{
27    FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
28};
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
31use tracing::{Instrument, warn};
32
33use super::version::VersionUpdate;
34use crate::error::StorageResult;
35use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
36use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
37use crate::hummock::iterator::{
38    Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
39    IteratorFactory, MergeIterator, UserIterator,
40};
41use crate::hummock::local_version::pinned_version::PinnedVersion;
42use crate::hummock::shared_buffer::TableMemoryMetrics;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44    SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45    SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50    wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54    BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55    SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61/// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to
62/// the hummock state backend. It is created via `HummockStorage::new_local`.
63pub struct LocalHummockStorage {
64    mem_table: MemTable,
65
66    spill_offset: u16,
67    epoch: Option<EpochPair>,
68
69    table_id: TableId,
70    op_consistency_level: OpConsistencyLevel,
71    table_option: TableOption,
72
73    instance_guard: LocalInstanceGuard,
74
75    /// Read handle.
76    read_version: HummockReadVersionRef,
77
78    /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
79    /// It's used by executors in different CNs to synchronize states.
80    ///
81    /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
82    /// persisted, so we won't have duplicate data.
83    ///
84    /// This also handles a corner case where an executor doing replication
85    /// is scheduled to the same CN as its Upstream executor.
86    /// In that case, we use this flag to avoid reading the same data twice,
87    /// by ignoring the replicated `ReadVersion`.
88    is_replicated: bool,
89
90    /// Whether or not send imm to uploader on every flush.
91    upload_on_flush: bool,
92
93    /// Event sender.
94    event_sender: HummockEventSender,
95
96    memory_limiter: Arc<MemoryLimiter>,
97
98    hummock_version_reader: HummockVersionReader,
99
100    table_memory_metrics: Arc<TableMemoryMetrics>,
101
102    write_limiter: WriteLimiterRef,
103
104    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106    mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110    async fn get_flushed<'a, O>(
111        hummock_version_reader: &'a HummockVersionReader,
112        read_version: &HummockReadVersionRef,
113        user_key: UserKey<Bytes>,
114        table_option: TableOption,
115        read_options: ReadOptions,
116        on_key_value_fn: impl KeyValueFn<'a, O>,
117        epoch: HummockEpoch,
118    ) -> StorageResult<Option<O>> {
119        let table_key_range = (
120            Bound::Included(user_key.table_key.clone()),
121            Bound::Included(user_key.table_key.clone()),
122        );
123
124        let (table_key_range, read_snapshot) =
125            read_filter_for_version(epoch, user_key.table_id, table_key_range, read_version)?;
126
127        if is_empty_key_range(&table_key_range) {
128            return Ok(None);
129        }
130
131        hummock_version_reader
132            .get(
133                user_key.table_key,
134                epoch,
135                user_key.table_id,
136                table_option,
137                read_options,
138                read_snapshot,
139                on_key_value_fn,
140            )
141            .await
142    }
143
144    async fn iter_flushed(
145        &self,
146        table_key_range: TableKeyRange,
147        read_options: ReadOptions,
148        epoch: HummockEpoch,
149    ) -> StorageResult<HummockStorageIterator> {
150        let (table_key_range, read_snapshot) =
151            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
152
153        let table_key_range = table_key_range;
154
155        self.hummock_version_reader
156            .iter(
157                table_key_range,
158                epoch,
159                self.table_id,
160                self.table_option,
161                read_options,
162                read_snapshot,
163            )
164            .await
165    }
166
167    async fn rev_iter_flushed(
168        &self,
169        table_key_range: TableKeyRange,
170        read_options: ReadOptions,
171        epoch: HummockEpoch,
172    ) -> StorageResult<HummockStorageRevIterator> {
173        let (table_key_range, read_snapshot) =
174            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
175
176        let table_key_range = table_key_range;
177
178        self.hummock_version_reader
179            .rev_iter(
180                table_key_range,
181                epoch,
182                self.table_id,
183                self.table_option,
184                read_options,
185                read_snapshot,
186                None,
187            )
188            .await
189    }
190}
191
192impl LocalHummockStorage {
193    fn epoch(&self) -> u64 {
194        self.epoch.expect("should have set the epoch").curr
195    }
196
197    fn current_epoch_with_gap(&self) -> EpochWithGap {
198        EpochWithGap::new(self.epoch(), self.spill_offset)
199    }
200
201    fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
202        MemTableHummockIterator::new(
203            &self.mem_table.buffer,
204            self.current_epoch_with_gap(),
205            self.table_id,
206        )
207    }
208
209    fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
210        MemTableHummockRevIterator::new(
211            &self.mem_table.buffer,
212            self.current_epoch_with_gap(),
213            self.table_id,
214        )
215    }
216
217    async fn iter_all(
218        &self,
219        table_key_range: TableKeyRange,
220        epoch: u64,
221        read_options: ReadOptions,
222    ) -> StorageResult<LocalHummockStorageIterator<'_>> {
223        let (table_key_range, read_snapshot) =
224            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
225
226        self.hummock_version_reader
227            .iter_with_memtable(
228                table_key_range,
229                epoch,
230                self.table_id,
231                self.table_option,
232                read_options,
233                read_snapshot,
234                Some(self.mem_table_iter()),
235            )
236            .await
237    }
238
239    async fn rev_iter_all(
240        &self,
241        table_key_range: TableKeyRange,
242        epoch: u64,
243        read_options: ReadOptions,
244    ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245        let (table_key_range, read_snapshot) =
246            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248        self.hummock_version_reader
249            .rev_iter(
250                table_key_range,
251                epoch,
252                self.table_id,
253                self.table_option,
254                read_options,
255                read_snapshot,
256                Some(self.mem_table_rev_iter()),
257            )
258            .await
259    }
260}
261
262#[derive(Clone)]
263pub struct LocalHummockFlushedSnapshotReader {
264    table_id: TableId,
265    table_option: TableOption,
266    read_version: HummockReadVersionRef,
267    hummock_version_reader: HummockVersionReader,
268    epoch: HummockEpoch,
269}
270
271impl StateStoreGet for LocalHummockFlushedSnapshotReader {
272    async fn on_key_value<'a, O: Send + 'a>(
273        &'a self,
274        key: TableKey<Bytes>,
275        read_options: ReadOptions,
276        on_key_value_fn: impl KeyValueFn<'a, O>,
277    ) -> StorageResult<Option<O>> {
278        let key = UserKey::new(self.table_id, key);
279        Self::get_flushed(
280            &self.hummock_version_reader,
281            &self.read_version,
282            key,
283            self.table_option,
284            read_options,
285            on_key_value_fn,
286            self.epoch,
287        )
288        .await
289    }
290}
291
292impl StateStoreRead for LocalHummockFlushedSnapshotReader {
293    type Iter = HummockStorageIterator;
294    type RevIter = HummockStorageRevIterator;
295
296    fn iter(
297        &self,
298        key_range: TableKeyRange,
299        read_options: ReadOptions,
300    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
301        self.iter_flushed(key_range, read_options, self.epoch)
302            .instrument(tracing::trace_span!("hummock_iter"))
303    }
304
305    fn rev_iter(
306        &self,
307        key_range: TableKeyRange,
308        read_options: ReadOptions,
309    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
310        self.rev_iter_flushed(key_range, read_options, self.epoch)
311            .instrument(tracing::trace_span!("hummock_rev_iter"))
312    }
313}
314
315impl StateStoreGet for LocalHummockStorage {
316    async fn on_key_value<'a, O: Send + 'a>(
317        &'a self,
318        key: TableKey<Bytes>,
319        read_options: ReadOptions,
320        on_key_value_fn: impl KeyValueFn<'a, O>,
321    ) -> StorageResult<Option<O>> {
322        let key = UserKey::new(self.table_id, key);
323        match self.mem_table.buffer.get(&key.table_key) {
324            None => {
325                LocalHummockFlushedSnapshotReader::get_flushed(
326                    &self.hummock_version_reader,
327                    &self.read_version,
328                    key,
329                    self.table_option,
330                    read_options,
331                    on_key_value_fn,
332                    self.epoch(),
333                )
334                .await
335            }
336            Some(op) => match op {
337                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
338                    Some(on_key_value_fn(
339                        FullKey::new_with_gap_epoch(
340                            self.table_id,
341                            key.table_key.to_ref(),
342                            self.current_epoch_with_gap(),
343                        ),
344                        value.as_ref(),
345                    )?)
346                }),
347                KeyOp::Delete(_) => Ok(None),
348            },
349        }
350    }
351}
352
353impl LocalStateStore for LocalHummockStorage {
354    type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
355    type Iter<'a> = LocalHummockStorageIterator<'a>;
356    type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
357
358    async fn iter(
359        &self,
360        key_range: TableKeyRange,
361        read_options: ReadOptions,
362    ) -> StorageResult<Self::Iter<'_>> {
363        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
364        assert_eq!(
365            r_vnode_exclusive - l_vnode_inclusive,
366            1,
367            "read range {:?} for table {} iter contains more than one vnode",
368            key_range,
369            self.table_id
370        );
371        self.iter_all(key_range.clone(), self.epoch(), read_options)
372            .await
373    }
374
375    async fn rev_iter(
376        &self,
377        key_range: TableKeyRange,
378        read_options: ReadOptions,
379    ) -> StorageResult<Self::RevIter<'_>> {
380        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
381        assert_eq!(
382            r_vnode_exclusive - l_vnode_inclusive,
383            1,
384            "read range {:?} for table {} iter contains more than one vnode",
385            key_range,
386            self.table_id
387        );
388        self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
389            .await
390    }
391
392    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
393        assert_eq!(
394            self.table_option.retention_seconds, None,
395            "flushed snapshot reader should not work with table {} with ttl",
396            self.table_id
397        );
398        self.new_flushed_snapshot_reader_inner(MAX_EPOCH)
399    }
400
401    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
402        self.read_version.read().latest_watermark(vnode)
403    }
404
405    fn insert(
406        &mut self,
407        key: TableKey<Bytes>,
408        new_val: Bytes,
409        old_val: Option<Bytes>,
410    ) -> StorageResult<()> {
411        match old_val {
412            None => self.mem_table.insert(key, new_val)?,
413            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
414        };
415
416        Ok(())
417    }
418
419    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
420        self.mem_table.delete(key, old_val)?;
421
422        Ok(())
423    }
424
425    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
426        self.update_vnode_bitmap_impl(vnodes).await
427    }
428}
429
430impl StateStoreWriteEpochControl for LocalHummockStorage {
431    async fn flush(&mut self) -> StorageResult<usize> {
432        let buffer = self.mem_table.drain().into_parts();
433        let mut kv_pairs = Vec::with_capacity(buffer.len());
434        let mut old_values = if self.is_flush_old_value() {
435            Some(Vec::with_capacity(buffer.len()))
436        } else {
437            None
438        };
439        let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
440            Some(self.new_flushed_snapshot_reader_inner(self.epoch()))
441        } else {
442            None
443        };
444        for (key, key_op) in buffer {
445            match key_op {
446                // Currently, some executors do not strictly comply with these semantics. As
447                // a workaround you may call disable the check by initializing the
448                // state store with `is_consistent_op=false`.
449                KeyOp::Insert(value) => {
450                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
451                        do_insert_sanity_check(
452                            self.table_id,
453                            &key,
454                            &value,
455                            sanity_check_reader,
456                            &self.op_consistency_level,
457                        )
458                        .await?;
459                    }
460                    kv_pairs.push((key, SharedBufferValue::Insert(value)));
461                    if let Some(old_values) = &mut old_values {
462                        old_values.push(Bytes::new());
463                    }
464                }
465                KeyOp::Delete(old_value) => {
466                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
467                        do_delete_sanity_check(
468                            self.table_id,
469                            &key,
470                            &old_value,
471                            sanity_check_reader,
472                            &self.op_consistency_level,
473                        )
474                        .await?;
475                    }
476                    kv_pairs.push((key, SharedBufferValue::Delete));
477                    if let Some(old_values) = &mut old_values {
478                        old_values.push(old_value);
479                    }
480                }
481                KeyOp::Update((old_value, new_value)) => {
482                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
483                        do_update_sanity_check(
484                            self.table_id,
485                            &key,
486                            &old_value,
487                            &new_value,
488                            sanity_check_reader,
489                            &self.op_consistency_level,
490                        )
491                        .await?;
492                    }
493                    kv_pairs.push((key, SharedBufferValue::Update(new_value)));
494                    if let Some(old_values) = &mut old_values {
495                        old_values.push(old_value);
496                    }
497                }
498            }
499        }
500        self.flush_inner(kv_pairs, old_values).await
501    }
502
503    async fn try_flush(&mut self) -> StorageResult<()> {
504        if self.mem_table_spill_threshold != 0
505            && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
506        {
507            if self.spill_offset < MAX_SPILL_TIMES {
508                self.flush().await?;
509                self.table_memory_metrics.mem_table_spill_counts.inc();
510            } else {
511                tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
512            }
513        }
514
515        Ok(())
516    }
517
518    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
519        let epoch = options.epoch;
520        wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
521        assert_eq!(
522            self.epoch.replace(epoch),
523            None,
524            "local state store of table id {:?} is init for more than once",
525            self.table_id
526        );
527        self.read_version.write().init();
528        if !self.is_replicated {
529            self.event_sender
530                .send(HummockEvent::InitEpoch {
531                    instance_id: self.instance_id(),
532                    init_epoch: options.epoch.curr,
533                })
534                .map_err(|_| {
535                    HummockError::other("failed to send InitEpoch. maybe shutting down")
536                })?;
537        }
538        Ok(())
539    }
540
541    fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
542        assert!(!self.mem_table.is_dirty());
543        if !self.is_replicated {
544            if self.upload_on_flush {
545                debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
546            } else {
547                let pending_imms = self.read_version.write().start_upload_pending_imms();
548                if !pending_imms.is_empty()
549                    && self
550                        .event_sender
551                        .send(HummockEvent::ImmToUploader {
552                            instance_id: self.instance_id(),
553                            imms: pending_imms,
554                        })
555                        .is_err()
556                {
557                    warn!("failed to send ImmToUploader during seal. maybe shutting down");
558                }
559            }
560        }
561
562        if let Some(new_level) = &opts.switch_op_consistency_level {
563            self.mem_table.op_consistency_level.update(new_level);
564            self.op_consistency_level.update(new_level);
565        }
566        let epoch = self
567            .epoch
568            .as_mut()
569            .expect("should have init epoch before seal the first epoch");
570        let prev_epoch = epoch.curr;
571        epoch.prev = prev_epoch;
572        epoch.curr = next_epoch;
573        self.spill_offset = 0;
574        assert!(
575            next_epoch > prev_epoch,
576            "new epoch {} should be greater than current epoch: {}",
577            next_epoch,
578            prev_epoch
579        );
580
581        if let Some((direction, watermarks, watermark_type)) = &mut opts.table_watermarks {
582            let mut read_version = self.read_version.write();
583            read_version.filter_regress_watermarks(watermarks);
584            if !watermarks.is_empty() {
585                read_version.update(VersionUpdate::NewTableWatermark {
586                    direction: *direction,
587                    epoch: prev_epoch,
588                    vnode_watermarks: watermarks.clone(),
589                    watermark_type: *watermark_type,
590                });
591            }
592        }
593
594        if !self.is_replicated
595            && self
596                .event_sender
597                .send(HummockEvent::LocalSealEpoch {
598                    instance_id: self.instance_id(),
599                    next_epoch,
600                    opts,
601                })
602                .is_err()
603        {
604            warn!("failed to send LocalSealEpoch. maybe shutting down");
605        }
606    }
607}
608
609impl LocalHummockStorage {
610    async fn update_vnode_bitmap_impl(
611        &mut self,
612        vnodes: Arc<Bitmap>,
613    ) -> StorageResult<Arc<Bitmap>> {
614        wait_for_epoch(
615            &self.version_update_notifier_tx,
616            self.epoch.expect("should have init").prev,
617            self.table_id,
618        )
619        .await?;
620        assert!(!self.mem_table.is_dirty());
621        let mut read_version = self.read_version.write();
622        assert!(
623            read_version.staging().is_empty(),
624            "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
625            self.table_id(),
626            self.instance_id()
627        );
628        Ok(read_version.update_vnode_bitmap(vnodes))
629    }
630
631    fn new_flushed_snapshot_reader_inner(
632        &self,
633        epoch: HummockEpoch,
634    ) -> LocalHummockFlushedSnapshotReader {
635        LocalHummockFlushedSnapshotReader {
636            table_id: self.table_id,
637            table_option: self.table_option,
638            read_version: self.read_version.clone(),
639            hummock_version_reader: self.hummock_version_reader.clone(),
640            epoch,
641        }
642    }
643
644    async fn flush_inner(
645        &mut self,
646        sorted_items: Vec<SharedBufferItem>,
647        old_values: Option<Vec<Bytes>>,
648    ) -> StorageResult<usize> {
649        let epoch = self.epoch();
650        let table_id = self.table_id;
651
652        self.table_memory_metrics
653            .write_batch_tuple_counts
654            .inc_by(sorted_items.len() as _);
655        let timer = self.table_memory_metrics.write_batch_duration.start_timer();
656
657        let imm_size = if !sorted_items.is_empty() {
658            let (size, old_value_size) =
659                SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
660
661            let old_values = old_values.map(|old_values| {
662                SharedBufferBatchOldValues::new(
663                    old_values,
664                    old_value_size,
665                    self.table_memory_metrics.old_value_size.clone(),
666                )
667            });
668
669            let instance_id = self.instance_guard.instance_id;
670            let imm = SharedBufferBatch::build_shared_buffer_batch(
671                epoch,
672                self.spill_offset,
673                sorted_items,
674                old_values,
675                size,
676                table_id,
677                self.table_memory_metrics.clone(),
678            );
679            self.spill_offset += 1;
680
681            if self.is_replicated {
682                self.read_version.write().add_replicated_imm(imm);
683            } else {
684                self.write_limiter.wait_permission(self.table_id).await;
685                let limiter = &self.memory_limiter;
686                let (tracker, fast_required_memory) = match limiter.try_require_memory(size as u64)
687                {
688                    Some(tracker) => (tracker, true),
689                    None => {
690                        warn!(
691                            "blocked at requiring memory: {}, current {}",
692                            size,
693                            limiter.get_memory_usage()
694                        );
695                        self.event_sender
696                            .send(HummockEvent::BufferMayFlush)
697                            .expect("should be able to send");
698                        let tracker = limiter
699                            .require_memory(size as u64)
700                            .instrument_await("hummock_require_memory".verbose())
701                            .await;
702                        warn!(
703                            "successfully requiring memory: {}, current {}",
704                            size,
705                            limiter.get_memory_usage()
706                        );
707                        (tracker, false)
708                    }
709                };
710                let mut read_version = self.read_version.write();
711                read_version.add_pending_imm(imm, tracker);
712                if self.upload_on_flush
713                    || read_version.pending_imm_size() >= self.mem_table_spill_threshold
714                    || !fast_required_memory
715                {
716                    let imms = read_version.start_upload_pending_imms();
717                    self.event_sender
718                        .send(HummockEvent::ImmToUploader { instance_id, imms })
719                        .map_err(|_| {
720                            HummockError::other(
721                                "failed to send imm to uploader. maybe shutting down",
722                            )
723                        })?;
724                }
725            }
726
727            size
728        } else {
729            0
730        };
731
732        timer.observe_duration();
733
734        self.table_memory_metrics
735            .write_batch_size
736            .observe(imm_size as _);
737        Ok(imm_size)
738    }
739}
740
741impl LocalHummockStorage {
742    pub fn new(
743        instance_guard: LocalInstanceGuard,
744        read_version: HummockReadVersionRef,
745        hummock_version_reader: HummockVersionReader,
746        event_sender: HummockEventSender,
747        memory_limiter: Arc<MemoryLimiter>,
748        write_limiter: WriteLimiterRef,
749        option: NewLocalOptions,
750        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
751        mem_table_spill_threshold: usize,
752    ) -> Self {
753        let table_memory_metrics = Arc::new(TableMemoryMetrics::new(
754            hummock_version_reader.stats(),
755            option.table_id,
756            option.is_replicated,
757        ));
758        Self {
759            mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
760            spill_offset: 0,
761            epoch: None,
762            table_id: option.table_id,
763            op_consistency_level: option.op_consistency_level,
764            table_option: option.table_option,
765            is_replicated: option.is_replicated,
766            instance_guard,
767            read_version,
768            event_sender,
769            memory_limiter,
770            hummock_version_reader,
771            table_memory_metrics,
772            write_limiter,
773            version_update_notifier_tx,
774            mem_table_spill_threshold,
775            upload_on_flush: option.upload_on_flush,
776        }
777    }
778
779    /// See `HummockReadVersion::update` for more details.
780    pub fn read_version(&self) -> HummockReadVersionRef {
781        self.read_version.clone()
782    }
783
784    pub fn table_id(&self) -> TableId {
785        self.instance_guard.table_id
786    }
787
788    pub fn instance_id(&self) -> u64 {
789        self.instance_guard.instance_id
790    }
791
792    fn is_flush_old_value(&self) -> bool {
793        matches!(
794            &self.op_consistency_level,
795            OpConsistencyLevel::ConsistentOldValue {
796                is_log_store: true,
797                ..
798            }
799        )
800    }
801}
802
803pub type StagingDataIterator = MergeIterator<
804    HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
805>;
806pub type StagingDataRevIterator = MergeIterator<
807    HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
808>;
809pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
810    HummockIteratorUnion<
811        Forward,
812        StagingDataIterator,
813        SstableIterator,
814        ConcatIteratorInner<SstableIterator>,
815        MemTableHummockIterator<'a>,
816    >,
817>;
818
819pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
820    HummockIteratorUnion<
821        Backward,
822        StagingDataRevIterator,
823        BackwardSstableIterator,
824        ConcatIteratorInner<BackwardSstableIterator>,
825        MemTableHummockRevIterator<'a>,
826    >,
827>;
828
829pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
830pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
831pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
832pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
833
834pub struct HummockStorageIteratorInner<'a> {
835    inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
836    initial_read: bool,
837    stats_guard: IterLocalMetricsGuard,
838}
839
840impl StateStoreIter for HummockStorageIteratorInner<'_> {
841    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
842        let iter = &mut self.inner;
843        if !self.initial_read {
844            self.initial_read = true;
845        } else {
846            iter.next().await?;
847        }
848
849        if iter.is_valid() {
850            Ok(Some((iter.key(), iter.value())))
851        } else {
852            Ok(None)
853        }
854    }
855}
856
857impl<'a> HummockStorageIteratorInner<'a> {
858    pub fn new(
859        inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
860        metrics: Arc<HummockStateStoreMetrics>,
861        table_id: TableId,
862        mut local_stats: StoreLocalStatistic,
863    ) -> Self {
864        local_stats.found_key = inner.is_valid();
865        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
866            + local_stats.staging_sst_iter_count
867            + local_stats.overlapping_iter_count
868            + local_stats.non_overlapping_iter_count;
869        Self {
870            inner,
871            initial_read: false,
872            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
873        }
874    }
875}
876
877impl Drop for HummockStorageIteratorInner<'_> {
878    fn drop(&mut self) {
879        self.inner
880            .collect_local_statistic(&mut self.stats_guard.local_stats);
881    }
882}
883
884#[derive(Default)]
885pub struct ForwardIteratorFactory {
886    non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
887    overlapping_iters: Vec<SstableIterator>,
888    staging_iters:
889        Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
890}
891
892impl ForwardIteratorFactory {
893    pub fn build(
894        self,
895        mem_table: Option<MemTableHummockIterator<'_>>,
896    ) -> HummockStorageIteratorPayloadInner<'_> {
897        // 3. build user_iterator
898        let staging_iter = StagingDataIterator::new(self.staging_iters);
899        MergeIterator::new(
900            once(HummockIteratorUnion::First(staging_iter))
901                .chain(
902                    self.overlapping_iters
903                        .into_iter()
904                        .map(HummockIteratorUnion::Second),
905                )
906                .chain(
907                    self.non_overlapping_iters
908                        .into_iter()
909                        .map(HummockIteratorUnion::Third),
910                )
911                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
912        )
913    }
914}
915
916pub struct HummockStorageRevIteratorInner<'a> {
917    inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
918    initial_read: bool,
919    stats_guard: IterLocalMetricsGuard,
920}
921
922impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
923    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
924        let iter = &mut self.inner;
925        if !self.initial_read {
926            self.initial_read = true;
927        } else {
928            iter.next().await?;
929        }
930
931        if iter.is_valid() {
932            Ok(Some((iter.key(), iter.value())))
933        } else {
934            Ok(None)
935        }
936    }
937}
938
939impl<'a> HummockStorageRevIteratorInner<'a> {
940    pub fn new(
941        inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
942        metrics: Arc<HummockStateStoreMetrics>,
943        table_id: TableId,
944        mut local_stats: StoreLocalStatistic,
945    ) -> Self {
946        local_stats.found_key = inner.is_valid();
947        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
948            + local_stats.staging_sst_iter_count
949            + local_stats.overlapping_iter_count
950            + local_stats.non_overlapping_iter_count;
951        Self {
952            inner,
953            initial_read: false,
954            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
955        }
956    }
957}
958
959impl Drop for HummockStorageRevIteratorInner<'_> {
960    fn drop(&mut self) {
961        self.inner
962            .collect_local_statistic(&mut self.stats_guard.local_stats);
963    }
964}
965
966impl IteratorFactory for ForwardIteratorFactory {
967    type Direction = Forward;
968    type SstableIteratorType = SstableIterator;
969
970    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
971        self.staging_iters
972            .push(HummockIteratorUnion::First(batch.into_forward_iter()));
973    }
974
975    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
976        self.staging_iters.push(HummockIteratorUnion::Second(iter));
977    }
978
979    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
980        self.overlapping_iters.push(iter);
981    }
982
983    fn add_concat_sst_iter(
984        &mut self,
985        tables: Vec<SstableInfo>,
986        sstable_store: SstableStoreRef,
987        read_options: Arc<SstableIteratorReadOptions>,
988    ) {
989        self.non_overlapping_iters
990            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
991                tables,
992                sstable_store,
993                read_options,
994            ));
995    }
996}
997
998#[derive(Default)]
999pub struct BackwardIteratorFactory {
1000    non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
1001    overlapping_iters: Vec<BackwardSstableIterator>,
1002    staging_iters: Vec<
1003        HummockIteratorUnion<
1004            Backward,
1005            SharedBufferBatchIterator<Backward>,
1006            BackwardSstableIterator,
1007        >,
1008    >,
1009}
1010
1011impl BackwardIteratorFactory {
1012    pub fn build(
1013        self,
1014        mem_table: Option<MemTableHummockRevIterator<'_>>,
1015    ) -> StorageRevIteratorPayloadInner<'_> {
1016        // 3. build user_iterator
1017        let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1018        MergeIterator::new(
1019            once(HummockIteratorUnion::First(staging_iter))
1020                .chain(
1021                    self.overlapping_iters
1022                        .into_iter()
1023                        .map(HummockIteratorUnion::Second),
1024                )
1025                .chain(
1026                    self.non_overlapping_iters
1027                        .into_iter()
1028                        .map(HummockIteratorUnion::Third),
1029                )
1030                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1031        )
1032    }
1033}
1034
1035impl IteratorFactory for BackwardIteratorFactory {
1036    type Direction = Backward;
1037    type SstableIteratorType = BackwardSstableIterator;
1038
1039    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1040        self.staging_iters
1041            .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1042    }
1043
1044    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1045        self.staging_iters.push(HummockIteratorUnion::Second(iter));
1046    }
1047
1048    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1049        self.overlapping_iters.push(iter);
1050    }
1051
1052    fn add_concat_sst_iter(
1053        &mut self,
1054        mut tables: Vec<SstableInfo>,
1055        sstable_store: SstableStoreRef,
1056        read_options: Arc<SstableIteratorReadOptions>,
1057    ) {
1058        tables.reverse();
1059        self.non_overlapping_iters
1060            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1061                tables,
1062                sstable_store,
1063                read_options,
1064            ));
1065    }
1066}