risingwave_storage/hummock/store/
local_hummock_storage.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::iter::once;
17use std::ops::Bound;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{TableId, TableOption};
24use risingwave_common::hash::VirtualNode;
25use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, MAX_SPILL_TIMES};
26use risingwave_hummock_sdk::EpochWithGap;
27use risingwave_hummock_sdk::key::{
28    FullKey, TableKey, TableKeyRange, UserKey, is_empty_key_range, vnode_range,
29};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
32use tracing::{Instrument, warn};
33
34use super::version::VersionUpdate;
35use crate::error::StorageResult;
36use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
37use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
38use crate::hummock::iterator::{
39    Backward, BackwardUserIterator, ConcatIteratorInner, Forward, HummockIteratorUnion,
40    IteratorFactory, MergeIterator, UserIterator,
41};
42use crate::hummock::local_version::pinned_version::PinnedVersion;
43use crate::hummock::shared_buffer::shared_buffer_batch::{
44    SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
45    SharedBufferValue,
46};
47use crate::hummock::store::version::{HummockVersionReader, read_filter_for_version};
48use crate::hummock::utils::{
49    do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
50    wait_for_epoch,
51};
52use crate::hummock::write_limiter::WriteLimiterRef;
53use crate::hummock::{
54    BackwardSstableIterator, HummockError, MemoryLimiter, SstableIterator,
55    SstableIteratorReadOptions, SstableStoreRef,
56};
57use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableHummockRevIterator};
58use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
59use crate::store::*;
60
61/// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to
62/// the hummock state backend. It is created via `HummockStorage::new_local`.
63pub struct LocalHummockStorage {
64    mem_table: MemTable,
65
66    spill_offset: u16,
67    epoch: Option<EpochPair>,
68
69    table_id: TableId,
70    op_consistency_level: OpConsistencyLevel,
71    table_option: TableOption,
72
73    instance_guard: LocalInstanceGuard,
74
75    /// Read handle.
76    read_version: HummockReadVersionRef,
77
78    /// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
79    /// It's used by executors in different CNs to synchronize states.
80    ///
81    /// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
82    /// persisted, so we won't have duplicate data.
83    ///
84    /// This also handles a corner case where an executor doing replication
85    /// is scheduled to the same CN as its Upstream executor.
86    /// In that case, we use this flag to avoid reading the same data twice,
87    /// by ignoring the replicated `ReadVersion`.
88    is_replicated: bool,
89
90    /// Whether or not send imm to uploader on every flush.
91    upload_on_flush: bool,
92
93    /// Event sender.
94    event_sender: HummockEventSender,
95
96    memory_limiter: Arc<MemoryLimiter>,
97
98    hummock_version_reader: HummockVersionReader,
99
100    stats: Arc<HummockStateStoreMetrics>,
101
102    write_limiter: WriteLimiterRef,
103
104    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
105
106    mem_table_spill_threshold: usize,
107}
108
109impl LocalHummockFlushedSnapshotReader {
110    async fn get_flushed<'a, O>(
111        hummock_version_reader: &'a HummockVersionReader,
112        read_version: &HummockReadVersionRef,
113        user_key: UserKey<Bytes>,
114        read_options: ReadOptions,
115        on_key_value_fn: impl KeyValueFn<'a, O>,
116    ) -> StorageResult<Option<O>> {
117        let table_key_range = (
118            Bound::Included(user_key.table_key.clone()),
119            Bound::Included(user_key.table_key.clone()),
120        );
121
122        let (table_key_range, read_snapshot) =
123            read_filter_for_version(MAX_EPOCH, user_key.table_id, table_key_range, read_version)?;
124
125        if is_empty_key_range(&table_key_range) {
126            return Ok(None);
127        }
128
129        hummock_version_reader
130            .get(
131                user_key.table_key,
132                MAX_EPOCH,
133                user_key.table_id,
134                read_options,
135                read_snapshot,
136                on_key_value_fn,
137            )
138            .await
139    }
140
141    async fn iter_flushed(
142        &self,
143        table_key_range: TableKeyRange,
144        read_options: ReadOptions,
145    ) -> StorageResult<HummockStorageIterator> {
146        let (table_key_range, read_snapshot) = read_filter_for_version(
147            MAX_EPOCH,
148            self.table_id,
149            table_key_range,
150            &self.read_version,
151        )?;
152
153        let table_key_range = table_key_range;
154
155        self.hummock_version_reader
156            .iter(
157                table_key_range,
158                MAX_EPOCH,
159                self.table_id,
160                read_options,
161                read_snapshot,
162            )
163            .await
164    }
165
166    async fn rev_iter_flushed(
167        &self,
168        table_key_range: TableKeyRange,
169        read_options: ReadOptions,
170    ) -> StorageResult<HummockStorageRevIterator> {
171        let (table_key_range, read_snapshot) = read_filter_for_version(
172            MAX_EPOCH,
173            self.table_id,
174            table_key_range,
175            &self.read_version,
176        )?;
177
178        let table_key_range = table_key_range;
179
180        self.hummock_version_reader
181            .rev_iter(
182                table_key_range,
183                MAX_EPOCH,
184                self.table_id,
185                read_options,
186                read_snapshot,
187                None,
188            )
189            .await
190    }
191}
192
193impl LocalHummockStorage {
194    fn epoch(&self) -> u64 {
195        self.epoch.expect("should have set the epoch").curr
196    }
197
198    fn current_epoch_with_gap(&self) -> EpochWithGap {
199        EpochWithGap::new(self.epoch(), self.spill_offset)
200    }
201
202    fn mem_table_iter(&self) -> MemTableHummockIterator<'_> {
203        MemTableHummockIterator::new(
204            &self.mem_table.buffer,
205            self.current_epoch_with_gap(),
206            self.table_id,
207        )
208    }
209
210    fn mem_table_rev_iter(&self) -> MemTableHummockRevIterator<'_> {
211        MemTableHummockRevIterator::new(
212            &self.mem_table.buffer,
213            self.current_epoch_with_gap(),
214            self.table_id,
215        )
216    }
217
218    async fn iter_all(
219        &self,
220        table_key_range: TableKeyRange,
221        epoch: u64,
222        read_options: ReadOptions,
223    ) -> StorageResult<LocalHummockStorageIterator<'_>> {
224        let (table_key_range, read_snapshot) =
225            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
226
227        self.hummock_version_reader
228            .iter_with_memtable(
229                table_key_range,
230                epoch,
231                self.table_id,
232                read_options,
233                read_snapshot,
234                Some(self.mem_table_iter()),
235            )
236            .await
237    }
238
239    async fn rev_iter_all(
240        &self,
241        table_key_range: TableKeyRange,
242        epoch: u64,
243        read_options: ReadOptions,
244    ) -> StorageResult<LocalHummockStorageRevIterator<'_>> {
245        let (table_key_range, read_snapshot) =
246            read_filter_for_version(epoch, self.table_id, table_key_range, &self.read_version)?;
247
248        self.hummock_version_reader
249            .rev_iter(
250                table_key_range,
251                epoch,
252                self.table_id,
253                read_options,
254                read_snapshot,
255                Some(self.mem_table_rev_iter()),
256            )
257            .await
258    }
259}
260
261#[derive(Clone)]
262pub struct LocalHummockFlushedSnapshotReader {
263    table_id: TableId,
264    read_version: HummockReadVersionRef,
265    hummock_version_reader: HummockVersionReader,
266}
267
268impl StateStoreGet for LocalHummockFlushedSnapshotReader {
269    async fn on_key_value<'a, O: Send + 'a>(
270        &'a self,
271        key: TableKey<Bytes>,
272        read_options: ReadOptions,
273        on_key_value_fn: impl KeyValueFn<'a, O>,
274    ) -> StorageResult<Option<O>> {
275        let key = UserKey::new(self.table_id, key);
276        Self::get_flushed(
277            &self.hummock_version_reader,
278            &self.read_version,
279            key,
280            read_options,
281            on_key_value_fn,
282        )
283        .await
284    }
285}
286
287impl StateStoreRead for LocalHummockFlushedSnapshotReader {
288    type Iter = HummockStorageIterator;
289    type RevIter = HummockStorageRevIterator;
290
291    fn iter(
292        &self,
293        key_range: TableKeyRange,
294        read_options: ReadOptions,
295    ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
296        self.iter_flushed(key_range, read_options)
297            .instrument(tracing::trace_span!("hummock_iter"))
298    }
299
300    fn rev_iter(
301        &self,
302        key_range: TableKeyRange,
303        read_options: ReadOptions,
304    ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
305        self.rev_iter_flushed(key_range, read_options)
306            .instrument(tracing::trace_span!("hummock_rev_iter"))
307    }
308}
309
310impl StateStoreGet for LocalHummockStorage {
311    async fn on_key_value<'a, O: Send + 'a>(
312        &'a self,
313        key: TableKey<Bytes>,
314        read_options: ReadOptions,
315        on_key_value_fn: impl KeyValueFn<'a, O>,
316    ) -> StorageResult<Option<O>> {
317        let key = UserKey::new(self.table_id, key);
318        match self.mem_table.buffer.get(&key.table_key) {
319            None => {
320                LocalHummockFlushedSnapshotReader::get_flushed(
321                    &self.hummock_version_reader,
322                    &self.read_version,
323                    key,
324                    read_options,
325                    on_key_value_fn,
326                )
327                .await
328            }
329            Some(op) => match op {
330                KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok({
331                    Some(on_key_value_fn(
332                        FullKey::new_with_gap_epoch(
333                            self.table_id,
334                            key.table_key.to_ref(),
335                            self.current_epoch_with_gap(),
336                        ),
337                        value.as_ref(),
338                    )?)
339                }),
340                KeyOp::Delete(_) => Ok(None),
341            },
342        }
343    }
344}
345
346impl LocalStateStore for LocalHummockStorage {
347    type FlushedSnapshotReader = LocalHummockFlushedSnapshotReader;
348    type Iter<'a> = LocalHummockStorageIterator<'a>;
349    type RevIter<'a> = LocalHummockStorageRevIterator<'a>;
350
351    async fn iter(
352        &self,
353        key_range: TableKeyRange,
354        read_options: ReadOptions,
355    ) -> StorageResult<Self::Iter<'_>> {
356        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
357        assert_eq!(
358            r_vnode_exclusive - l_vnode_inclusive,
359            1,
360            "read range {:?} for table {} iter contains more than one vnode",
361            key_range,
362            self.table_id
363        );
364        self.iter_all(key_range.clone(), self.epoch(), read_options)
365            .await
366    }
367
368    async fn rev_iter(
369        &self,
370        key_range: TableKeyRange,
371        read_options: ReadOptions,
372    ) -> StorageResult<Self::RevIter<'_>> {
373        let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
374        assert_eq!(
375            r_vnode_exclusive - l_vnode_inclusive,
376            1,
377            "read range {:?} for table {} iter contains more than one vnode",
378            key_range,
379            self.table_id
380        );
381        self.rev_iter_all(key_range.clone(), self.epoch(), read_options)
382            .await
383    }
384
385    fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
386        self.new_flushed_snapshot_reader_inner()
387    }
388
389    fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
390        self.read_version.read().latest_watermark(vnode)
391    }
392
393    fn insert(
394        &mut self,
395        key: TableKey<Bytes>,
396        new_val: Bytes,
397        old_val: Option<Bytes>,
398    ) -> StorageResult<()> {
399        match old_val {
400            None => self.mem_table.insert(key, new_val)?,
401            Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
402        };
403
404        Ok(())
405    }
406
407    fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
408        self.mem_table.delete(key, old_val)?;
409
410        Ok(())
411    }
412
413    async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
414        self.update_vnode_bitmap_impl(vnodes).await
415    }
416}
417
418impl StateStoreWriteEpochControl for LocalHummockStorage {
419    async fn flush(&mut self) -> StorageResult<usize> {
420        let buffer = self.mem_table.drain().into_parts();
421        let mut kv_pairs = Vec::with_capacity(buffer.len());
422        let mut old_values = if self.is_flush_old_value() {
423            Some(Vec::with_capacity(buffer.len()))
424        } else {
425            None
426        };
427        let sanity_check_flushed_snapshot_reader = if sanity_check_enabled() {
428            Some(self.new_flushed_snapshot_reader_inner())
429        } else {
430            None
431        };
432        for (key, key_op) in buffer {
433            match key_op {
434                // Currently, some executors do not strictly comply with these semantics. As
435                // a workaround you may call disable the check by initializing the
436                // state store with `is_consistent_op=false`.
437                KeyOp::Insert(value) => {
438                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
439                        do_insert_sanity_check(
440                            self.table_id,
441                            &key,
442                            &value,
443                            sanity_check_reader,
444                            self.table_option,
445                            &self.op_consistency_level,
446                        )
447                        .await?;
448                    }
449                    kv_pairs.push((key, SharedBufferValue::Insert(value)));
450                    if let Some(old_values) = &mut old_values {
451                        old_values.push(Bytes::new());
452                    }
453                }
454                KeyOp::Delete(old_value) => {
455                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
456                        do_delete_sanity_check(
457                            self.table_id,
458                            &key,
459                            &old_value,
460                            sanity_check_reader,
461                            self.table_option,
462                            &self.op_consistency_level,
463                        )
464                        .await?;
465                    }
466                    kv_pairs.push((key, SharedBufferValue::Delete));
467                    if let Some(old_values) = &mut old_values {
468                        old_values.push(old_value);
469                    }
470                }
471                KeyOp::Update((old_value, new_value)) => {
472                    if let Some(sanity_check_reader) = &sanity_check_flushed_snapshot_reader {
473                        do_update_sanity_check(
474                            self.table_id,
475                            &key,
476                            &old_value,
477                            &new_value,
478                            sanity_check_reader,
479                            self.table_option,
480                            &self.op_consistency_level,
481                        )
482                        .await?;
483                    }
484                    kv_pairs.push((key, SharedBufferValue::Update(new_value)));
485                    if let Some(old_values) = &mut old_values {
486                        old_values.push(old_value);
487                    }
488                }
489            }
490        }
491        self.flush_inner(kv_pairs, old_values).await
492    }
493
494    async fn try_flush(&mut self) -> StorageResult<()> {
495        if self.mem_table_spill_threshold != 0
496            && self.mem_table.kv_size.size() > self.mem_table_spill_threshold
497        {
498            if self.spill_offset < MAX_SPILL_TIMES {
499                let table_id_label = self.table_id.to_string();
500                self.flush().await?;
501                self.stats
502                    .mem_table_spill_counts
503                    .with_label_values(&[table_id_label.as_str()])
504                    .inc();
505            } else {
506                tracing::warn!("No mem table spill occurs, the gap epoch exceeds available range.");
507            }
508        }
509
510        Ok(())
511    }
512
513    async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
514        let epoch = options.epoch;
515        wait_for_epoch(&self.version_update_notifier_tx, epoch.prev, self.table_id).await?;
516        assert_eq!(
517            self.epoch.replace(epoch),
518            None,
519            "local state store of table id {:?} is init for more than once",
520            self.table_id
521        );
522        self.read_version.write().init();
523        if !self.is_replicated {
524            self.event_sender
525                .send(HummockEvent::InitEpoch {
526                    instance_id: self.instance_id(),
527                    init_epoch: options.epoch.curr,
528                })
529                .map_err(|_| {
530                    HummockError::other("failed to send InitEpoch. maybe shutting down")
531                })?;
532        }
533        Ok(())
534    }
535
536    fn seal_current_epoch(&mut self, next_epoch: u64, mut opts: SealCurrentEpochOptions) {
537        assert!(!self.mem_table.is_dirty());
538        if !self.is_replicated {
539            if self.upload_on_flush {
540                debug_assert_eq!(self.read_version.write().pending_imm_size(), 0);
541            } else {
542                let pending_imms = self.read_version.write().start_upload_pending_imms();
543                if !pending_imms.is_empty()
544                    && self
545                        .event_sender
546                        .send(HummockEvent::ImmToUploader {
547                            instance_id: self.instance_id(),
548                            imms: pending_imms,
549                        })
550                        .is_err()
551                {
552                    warn!("failed to send ImmToUploader during seal. maybe shutting down");
553                }
554            }
555        }
556
557        if let Some(new_level) = &opts.switch_op_consistency_level {
558            self.mem_table.op_consistency_level.update(new_level);
559            self.op_consistency_level.update(new_level);
560        }
561        let epoch = self
562            .epoch
563            .as_mut()
564            .expect("should have init epoch before seal the first epoch");
565        let prev_epoch = epoch.curr;
566        epoch.prev = prev_epoch;
567        epoch.curr = next_epoch;
568        self.spill_offset = 0;
569        assert!(
570            next_epoch > prev_epoch,
571            "new epoch {} should be greater than current epoch: {}",
572            next_epoch,
573            prev_epoch
574        );
575
576        // only update the PkPrefix watermark for read
577        if let Some((direction, watermarks, WatermarkSerdeType::PkPrefix)) =
578            &mut opts.table_watermarks
579        {
580            let mut read_version = self.read_version.write();
581            read_version.filter_regress_watermarks(watermarks);
582            if !watermarks.is_empty() {
583                read_version.update(VersionUpdate::NewTableWatermark {
584                    direction: *direction,
585                    epoch: prev_epoch,
586                    vnode_watermarks: watermarks.clone(),
587                    watermark_type: WatermarkSerdeType::PkPrefix,
588                });
589            }
590        }
591
592        if !self.is_replicated
593            && self
594                .event_sender
595                .send(HummockEvent::LocalSealEpoch {
596                    instance_id: self.instance_id(),
597                    next_epoch,
598                    opts,
599                })
600                .is_err()
601        {
602            warn!("failed to send LocalSealEpoch. maybe shutting down");
603        }
604    }
605}
606
607impl LocalHummockStorage {
608    async fn update_vnode_bitmap_impl(
609        &mut self,
610        vnodes: Arc<Bitmap>,
611    ) -> StorageResult<Arc<Bitmap>> {
612        wait_for_epoch(
613            &self.version_update_notifier_tx,
614            self.epoch.expect("should have init").prev,
615            self.table_id,
616        )
617        .await?;
618        assert!(!self.mem_table.is_dirty());
619        let mut read_version = self.read_version.write();
620        assert!(
621            read_version.staging().is_empty(),
622            "There is uncommitted staging data in read version table_id {:?} instance_id {:?} on vnode bitmap update",
623            self.table_id(),
624            self.instance_id()
625        );
626        Ok(read_version.update_vnode_bitmap(vnodes))
627    }
628
629    fn new_flushed_snapshot_reader_inner(&self) -> LocalHummockFlushedSnapshotReader {
630        LocalHummockFlushedSnapshotReader {
631            table_id: self.table_id,
632            read_version: self.read_version.clone(),
633            hummock_version_reader: self.hummock_version_reader.clone(),
634        }
635    }
636
637    async fn flush_inner(
638        &mut self,
639        sorted_items: Vec<SharedBufferItem>,
640        old_values: Option<Vec<Bytes>>,
641    ) -> StorageResult<usize> {
642        let epoch = self.epoch();
643        let table_id = self.table_id;
644
645        let table_id_label = table_id.to_string();
646        self.stats
647            .write_batch_tuple_counts
648            .with_label_values(&[table_id_label.as_str()])
649            .inc_by(sorted_items.len() as _);
650        let timer = self
651            .stats
652            .write_batch_duration
653            .with_label_values(&[table_id_label.as_str()])
654            .start_timer();
655
656        let imm_size = if !sorted_items.is_empty() {
657            let (size, old_value_size) =
658                SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
659
660            self.write_limiter.wait_permission(self.table_id).await;
661            let limiter = self.memory_limiter.as_ref();
662            let tracker = match limiter.try_require_memory(size as u64) {
663                Some(tracker) => tracker,
664                _ => {
665                    warn!(
666                        "blocked at requiring memory: {}, current {}",
667                        size,
668                        limiter.get_memory_usage()
669                    );
670                    self.event_sender
671                        .send(HummockEvent::BufferMayFlush)
672                        .expect("should be able to send");
673                    let tracker = limiter
674                        .require_memory(size as u64)
675                        .instrument_await("hummock_require_memory".verbose())
676                        .await;
677                    warn!(
678                        "successfully requiring memory: {}, current {}",
679                        size,
680                        limiter.get_memory_usage()
681                    );
682                    tracker
683                }
684            };
685
686            let old_values = old_values.map(|old_values| {
687                SharedBufferBatchOldValues::new(
688                    old_values,
689                    old_value_size,
690                    self.stats.old_value_size.clone(),
691                )
692            });
693
694            let instance_id = self.instance_guard.instance_id;
695            let imm = SharedBufferBatch::build_shared_buffer_batch(
696                epoch,
697                self.spill_offset,
698                sorted_items,
699                old_values,
700                size,
701                table_id,
702                Some(tracker),
703            );
704            self.spill_offset += 1;
705            let imm_size = imm.size();
706            let mut read_version = self.read_version.write();
707            read_version.add_imm(imm);
708
709            // insert imm to uploader
710            if !self.is_replicated
711                && (self.upload_on_flush
712                    || read_version.pending_imm_size() >= self.mem_table_spill_threshold)
713            {
714                let imms = read_version.start_upload_pending_imms();
715                self.event_sender
716                    .send(HummockEvent::ImmToUploader { instance_id, imms })
717                    .map_err(|_| {
718                        HummockError::other("failed to send imm to uploader. maybe shutting down")
719                    })?;
720            }
721            imm_size
722        } else {
723            0
724        };
725
726        timer.observe_duration();
727
728        self.stats
729            .write_batch_size
730            .with_label_values(&[table_id_label.as_str()])
731            .observe(imm_size as _);
732        Ok(imm_size)
733    }
734}
735
736impl LocalHummockStorage {
737    #[allow(clippy::too_many_arguments)]
738    pub fn new(
739        instance_guard: LocalInstanceGuard,
740        read_version: HummockReadVersionRef,
741        hummock_version_reader: HummockVersionReader,
742        event_sender: HummockEventSender,
743        memory_limiter: Arc<MemoryLimiter>,
744        write_limiter: WriteLimiterRef,
745        option: NewLocalOptions,
746        version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
747        mem_table_spill_threshold: usize,
748    ) -> Self {
749        let stats = hummock_version_reader.stats().clone();
750        Self {
751            mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
752            spill_offset: 0,
753            epoch: None,
754            table_id: option.table_id,
755            op_consistency_level: option.op_consistency_level,
756            table_option: option.table_option,
757            is_replicated: option.is_replicated,
758            instance_guard,
759            read_version,
760            event_sender,
761            memory_limiter,
762            hummock_version_reader,
763            stats,
764            write_limiter,
765            version_update_notifier_tx,
766            mem_table_spill_threshold,
767            upload_on_flush: option.upload_on_flush,
768        }
769    }
770
771    /// See `HummockReadVersion::update` for more details.
772    pub fn read_version(&self) -> HummockReadVersionRef {
773        self.read_version.clone()
774    }
775
776    pub fn table_id(&self) -> TableId {
777        self.instance_guard.table_id
778    }
779
780    pub fn instance_id(&self) -> u64 {
781        self.instance_guard.instance_id
782    }
783
784    fn is_flush_old_value(&self) -> bool {
785        matches!(
786            &self.op_consistency_level,
787            OpConsistencyLevel::ConsistentOldValue {
788                is_log_store: true,
789                ..
790            }
791        )
792    }
793}
794
795pub type StagingDataIterator = MergeIterator<
796    HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
797>;
798pub type StagingDataRevIterator = MergeIterator<
799    HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
800>;
801pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
802    HummockIteratorUnion<
803        Forward,
804        StagingDataIterator,
805        SstableIterator,
806        ConcatIteratorInner<SstableIterator>,
807        MemTableHummockIterator<'a>,
808    >,
809>;
810
811pub type StorageRevIteratorPayloadInner<'a> = MergeIterator<
812    HummockIteratorUnion<
813        Backward,
814        StagingDataRevIterator,
815        BackwardSstableIterator,
816        ConcatIteratorInner<BackwardSstableIterator>,
817        MemTableHummockRevIterator<'a>,
818    >,
819>;
820
821pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
822pub type HummockStorageRevIterator = HummockStorageRevIteratorInner<'static>;
823pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
824pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;
825
826pub struct HummockStorageIteratorInner<'a> {
827    inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
828    initial_read: bool,
829    stats_guard: IterLocalMetricsGuard,
830}
831
832impl StateStoreIter for HummockStorageIteratorInner<'_> {
833    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
834        let iter = &mut self.inner;
835        if !self.initial_read {
836            self.initial_read = true;
837        } else {
838            iter.next().await?;
839        }
840
841        if iter.is_valid() {
842            Ok(Some((iter.key(), iter.value())))
843        } else {
844            Ok(None)
845        }
846    }
847}
848
849impl<'a> HummockStorageIteratorInner<'a> {
850    pub fn new(
851        inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
852        metrics: Arc<HummockStateStoreMetrics>,
853        table_id: TableId,
854        mut local_stats: StoreLocalStatistic,
855    ) -> Self {
856        local_stats.found_key = inner.is_valid();
857        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
858            + local_stats.staging_sst_iter_count
859            + local_stats.overlapping_iter_count
860            + local_stats.non_overlapping_iter_count;
861        Self {
862            inner,
863            initial_read: false,
864            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
865        }
866    }
867}
868
869impl Drop for HummockStorageIteratorInner<'_> {
870    fn drop(&mut self) {
871        self.inner
872            .collect_local_statistic(&mut self.stats_guard.local_stats);
873    }
874}
875
876#[derive(Default)]
877pub struct ForwardIteratorFactory {
878    non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
879    overlapping_iters: Vec<SstableIterator>,
880    staging_iters:
881        Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
882}
883
884impl ForwardIteratorFactory {
885    pub fn build(
886        self,
887        mem_table: Option<MemTableHummockIterator<'_>>,
888    ) -> HummockStorageIteratorPayloadInner<'_> {
889        // 3. build user_iterator
890        let staging_iter = StagingDataIterator::new(self.staging_iters);
891        MergeIterator::new(
892            once(HummockIteratorUnion::First(staging_iter))
893                .chain(
894                    self.overlapping_iters
895                        .into_iter()
896                        .map(HummockIteratorUnion::Second),
897                )
898                .chain(
899                    self.non_overlapping_iters
900                        .into_iter()
901                        .map(HummockIteratorUnion::Third),
902                )
903                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
904        )
905    }
906}
907
908pub struct HummockStorageRevIteratorInner<'a> {
909    inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
910    initial_read: bool,
911    stats_guard: IterLocalMetricsGuard,
912}
913
914impl StateStoreIter for HummockStorageRevIteratorInner<'_> {
915    async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
916        let iter = &mut self.inner;
917        if !self.initial_read {
918            self.initial_read = true;
919        } else {
920            iter.next().await?;
921        }
922
923        if iter.is_valid() {
924            Ok(Some((iter.key(), iter.value())))
925        } else {
926            Ok(None)
927        }
928    }
929}
930
931impl<'a> HummockStorageRevIteratorInner<'a> {
932    pub fn new(
933        inner: BackwardUserIterator<StorageRevIteratorPayloadInner<'a>>,
934        metrics: Arc<HummockStateStoreMetrics>,
935        table_id: TableId,
936        mut local_stats: StoreLocalStatistic,
937    ) -> Self {
938        local_stats.found_key = inner.is_valid();
939        local_stats.sub_iter_count = local_stats.staging_imm_iter_count
940            + local_stats.staging_sst_iter_count
941            + local_stats.overlapping_iter_count
942            + local_stats.non_overlapping_iter_count;
943        Self {
944            inner,
945            initial_read: false,
946            stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats),
947        }
948    }
949}
950
951impl Drop for HummockStorageRevIteratorInner<'_> {
952    fn drop(&mut self) {
953        self.inner
954            .collect_local_statistic(&mut self.stats_guard.local_stats);
955    }
956}
957
958impl IteratorFactory for ForwardIteratorFactory {
959    type Direction = Forward;
960    type SstableIteratorType = SstableIterator;
961
962    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
963        self.staging_iters
964            .push(HummockIteratorUnion::First(batch.into_forward_iter()));
965    }
966
967    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
968        self.staging_iters.push(HummockIteratorUnion::Second(iter));
969    }
970
971    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
972        self.overlapping_iters.push(iter);
973    }
974
975    fn add_concat_sst_iter(
976        &mut self,
977        tables: Vec<SstableInfo>,
978        sstable_store: SstableStoreRef,
979        read_options: Arc<SstableIteratorReadOptions>,
980    ) {
981        self.non_overlapping_iters
982            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
983                tables,
984                sstable_store,
985                read_options,
986            ));
987    }
988}
989
990#[derive(Default)]
991pub struct BackwardIteratorFactory {
992    non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
993    overlapping_iters: Vec<BackwardSstableIterator>,
994    staging_iters: Vec<
995        HummockIteratorUnion<
996            Backward,
997            SharedBufferBatchIterator<Backward>,
998            BackwardSstableIterator,
999        >,
1000    >,
1001}
1002
1003impl BackwardIteratorFactory {
1004    pub fn build(
1005        self,
1006        mem_table: Option<MemTableHummockRevIterator<'_>>,
1007    ) -> StorageRevIteratorPayloadInner<'_> {
1008        // 3. build user_iterator
1009        let staging_iter = StagingDataRevIterator::new(self.staging_iters);
1010        MergeIterator::new(
1011            once(HummockIteratorUnion::First(staging_iter))
1012                .chain(
1013                    self.overlapping_iters
1014                        .into_iter()
1015                        .map(HummockIteratorUnion::Second),
1016                )
1017                .chain(
1018                    self.non_overlapping_iters
1019                        .into_iter()
1020                        .map(HummockIteratorUnion::Third),
1021                )
1022                .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
1023        )
1024    }
1025}
1026
1027impl IteratorFactory for BackwardIteratorFactory {
1028    type Direction = Backward;
1029    type SstableIteratorType = BackwardSstableIterator;
1030
1031    fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
1032        self.staging_iters
1033            .push(HummockIteratorUnion::First(batch.into_backward_iter()));
1034    }
1035
1036    fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1037        self.staging_iters.push(HummockIteratorUnion::Second(iter));
1038    }
1039
1040    fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
1041        self.overlapping_iters.push(iter);
1042    }
1043
1044    fn add_concat_sst_iter(
1045        &mut self,
1046        mut tables: Vec<SstableInfo>,
1047        sstable_store: SstableStoreRef,
1048        read_options: Arc<SstableIteratorReadOptions>,
1049    ) {
1050        tables.reverse();
1051        self.non_overlapping_iters
1052            .push(ConcatIteratorInner::<Self::SstableIteratorType>::new(
1053                tables,
1054                sstable_store,
1055                read_options,
1056            ));
1057    }
1058}