risingwave_storage/
store_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{DirectFsDeviceOptions, Engine, FifoPicker, HybridCacheBuilder, LargeEngineOptions};
22use futures::FutureExt;
23use futures::future::BoxFuture;
24use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
25use risingwave_common::catalog::TableId;
26use risingwave_common::license::Feature;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use risingwave_common_service::RpcNotificationClient;
29use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
30use risingwave_object_store::object::build_remote_object_store;
31use thiserror_ext::AsReport;
32
33use crate::StateStore;
34use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
35use crate::error::StorageResult;
36use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
37use crate::hummock::{
38    Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
39    SstableBlockIndex, SstableStore, SstableStoreConfig,
40};
41use crate::memory::MemoryStateStore;
42use crate::memory::sled::SledStateStore;
43use crate::monitor::{
44    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
45    ObjectStoreMetrics,
46};
47use crate::opts::StorageOpts;
48
49static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
50    Box::new(PrometheusMetricsRegistry::new(
51        GLOBAL_METRICS_REGISTRY.clone(),
52    ))
53});
54
55mod opaque_type {
56    use super::*;
57
58    pub type HummockStorageType = impl StateStore + AsHummock;
59    pub type MemoryStateStoreType = impl StateStore + AsHummock;
60    pub type SledStateStoreType = impl StateStore + AsHummock;
61
62    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
63        may_dynamic_dispatch(state_store)
64    }
65
66    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
67        may_dynamic_dispatch(may_verify(state_store))
68    }
69
70    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
71        may_dynamic_dispatch(state_store)
72    }
73}
74pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
75use opaque_type::{hummock, in_memory, sled};
76
77#[cfg(feature = "hm-trace")]
78type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
79
80#[cfg(not(feature = "hm-trace"))]
81type Monitored<S> = MonitoredStateStore<S>;
82
83fn monitored<S: StateStore>(
84    state_store: S,
85    storage_metrics: Arc<MonitoredStorageMetrics>,
86) -> Monitored<S> {
87    let inner = {
88        #[cfg(feature = "hm-trace")]
89        {
90            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
91        }
92        #[cfg(not(feature = "hm-trace"))]
93        {
94            state_store
95        }
96    };
97    inner.monitored(storage_metrics)
98}
99
100fn inner<S>(state_store: &Monitored<S>) -> &S {
101    let inner = state_store.inner();
102    {
103        #[cfg(feature = "hm-trace")]
104        {
105            inner.inner()
106        }
107        #[cfg(not(feature = "hm-trace"))]
108        {
109            inner
110        }
111    }
112}
113
114/// The type erased [`StateStore`].
115#[derive(Clone, EnumAsInner)]
116#[allow(clippy::enum_variant_names)]
117pub enum StateStoreImpl {
118    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
119    /// `hummock` will be automatically recognized as Hummock state store.
120    ///
121    /// Example URLs:
122    ///
123    /// * `hummock+s3://bucket`
124    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
125    /// * `hummock+memory` (should only be used in 1 compute node mode)
126    HummockStateStore(Monitored<HummockStorageType>),
127    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
128    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
129    /// store misses some critical implementation to ensure the correctness of persisting streaming
130    /// state. (e.g., no `read_epoch` support, no async checkpoint)
131    MemoryStateStore(Monitored<MemoryStateStoreType>),
132    SledStateStore(Monitored<SledStateStoreType>),
133}
134
135fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
136    #[cfg(not(debug_assertions))]
137    {
138        state_store
139    }
140    #[cfg(debug_assertions)]
141    {
142        use crate::store_impl::dyn_state_store::StateStorePointer;
143        StateStorePointer(Arc::new(state_store) as _)
144    }
145}
146
147fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
148    #[cfg(not(debug_assertions))]
149    {
150        state_store
151    }
152    #[cfg(debug_assertions)]
153    {
154        use std::marker::PhantomData;
155
156        use risingwave_common::util::env_var::env_var_is_true;
157        use tracing::info;
158
159        use crate::store_impl::verify::VerifyStateStore;
160
161        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
162            info!("enable verify state store");
163            Some(SledStateStore::new_temp())
164        } else {
165            info!("verify state store is not enabled");
166            None
167        };
168        VerifyStateStore {
169            actual: state_store,
170            expected,
171            _phantom: PhantomData::<()>,
172        }
173    }
174}
175
176impl StateStoreImpl {
177    fn in_memory(
178        state_store: MemoryStateStore,
179        storage_metrics: Arc<MonitoredStorageMetrics>,
180    ) -> Self {
181        // The specific type of MemoryStateStoreType in deducted here.
182        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
183    }
184
185    pub fn hummock(
186        state_store: HummockStorage,
187        storage_metrics: Arc<MonitoredStorageMetrics>,
188    ) -> Self {
189        // The specific type of HummockStateStoreType in deducted here.
190        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
191    }
192
193    pub fn sled(
194        state_store: SledStateStore,
195        storage_metrics: Arc<MonitoredStorageMetrics>,
196    ) -> Self {
197        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
198    }
199
200    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
201        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
202    }
203
204    pub fn for_test() -> Self {
205        Self::in_memory(
206            MemoryStateStore::new(),
207            Arc::new(MonitoredStorageMetrics::unused()),
208        )
209    }
210
211    pub fn as_hummock(&self) -> Option<&HummockStorage> {
212        match self {
213            StateStoreImpl::HummockStateStore(hummock) => {
214                Some(inner(hummock).as_hummock().expect("should be hummock"))
215            }
216            _ => None,
217        }
218    }
219}
220
221impl Debug for StateStoreImpl {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        match self {
224            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
225            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
226            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
227        }
228    }
229}
230
231#[macro_export]
232macro_rules! dispatch_state_store {
233    ($impl:expr, $store:ident, $body:tt) => {{
234        use $crate::store_impl::StateStoreImpl;
235
236        match $impl {
237            StateStoreImpl::MemoryStateStore($store) => {
238                // WARNING: don't change this. Enabling memory backend will cause monomorphization
239                // explosion and thus slow compile time in release mode.
240                #[cfg(debug_assertions)]
241                {
242                    $body
243                }
244                #[cfg(not(debug_assertions))]
245                {
246                    let _store = $store;
247                    unimplemented!("memory state store should never be used in release mode");
248                }
249            }
250
251            StateStoreImpl::SledStateStore($store) => {
252                // WARNING: don't change this. Enabling memory backend will cause monomorphization
253                // explosion and thus slow compile time in release mode.
254                #[cfg(debug_assertions)]
255                {
256                    $body
257                }
258                #[cfg(not(debug_assertions))]
259                {
260                    let _store = $store;
261                    unimplemented!("sled state store should never be used in release mode");
262                }
263            }
264
265            StateStoreImpl::HummockStateStore($store) => $body,
266        }
267    }};
268}
269
270#[cfg(any(debug_assertions, test, feature = "test"))]
271pub mod verify {
272    use std::fmt::Debug;
273    use std::future::Future;
274    use std::marker::PhantomData;
275    use std::ops::Deref;
276    use std::sync::Arc;
277
278    use bytes::Bytes;
279    use risingwave_common::bitmap::Bitmap;
280    use risingwave_common::hash::VirtualNode;
281    use risingwave_hummock_sdk::HummockReadEpoch;
282    use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
283    use tracing::log::warn;
284
285    use crate::error::StorageResult;
286    use crate::hummock::HummockStorage;
287    use crate::store::*;
288    use crate::store_impl::AsHummock;
289
290    #[expect(dead_code)]
291    fn assert_result_eq<Item: PartialEq + Debug, E>(
292        first: &std::result::Result<Item, E>,
293        second: &std::result::Result<Item, E>,
294    ) {
295        match (first, second) {
296            (Ok(first), Ok(second)) => {
297                if first != second {
298                    warn!("result different: {:?} {:?}", first, second);
299                }
300                assert_eq!(first, second);
301            }
302            (Err(_), Err(_)) => {}
303            _ => {
304                warn!("one success and one failed");
305                panic!("result not equal");
306            }
307        }
308    }
309
310    #[derive(Clone)]
311    pub struct VerifyStateStore<A, E, T = ()> {
312        pub actual: A,
313        pub expected: Option<E>,
314        pub _phantom: PhantomData<T>,
315    }
316
317    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
318        fn as_hummock(&self) -> Option<&HummockStorage> {
319            self.actual.as_hummock()
320        }
321    }
322
323    impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
324        async fn on_key_value<O: Send + 'static>(
325            &self,
326            key: TableKey<Bytes>,
327            read_options: ReadOptions,
328            on_key_value_fn: impl KeyValueFn<O>,
329        ) -> StorageResult<Option<O>> {
330            let actual: Option<(FullKey<Bytes>, Bytes)> = self
331                .actual
332                .on_key_value(key.clone(), read_options.clone(), |key, value| {
333                    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
334                })
335                .await?;
336            if let Some(expected) = &self.expected {
337                let expected: Option<(FullKey<Bytes>, Bytes)> = expected
338                    .on_key_value(key, read_options, |key, value| {
339                        Ok((key.copy_into(), Bytes::copy_from_slice(value)))
340                    })
341                    .await?;
342                assert_eq!(
343                    actual
344                        .as_ref()
345                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
346                    expected
347                        .as_ref()
348                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
349                );
350            }
351
352            actual
353                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
354                .transpose()
355        }
356    }
357
358    impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
359        for VerifyStateStore<A, E>
360    {
361        fn nearest<O: Send + 'static>(
362            &self,
363            vec: Vector,
364            options: VectorNearestOptions,
365            on_nearest_item_fn: impl OnNearestItemFn<O>,
366        ) -> impl StorageFuture<'_, Vec<O>> {
367            self.actual.nearest(vec, options, on_nearest_item_fn)
368        }
369    }
370
371    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
372        type Iter = impl StateStoreReadIter;
373        type RevIter = impl StateStoreReadIter;
374
375        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
376        // fail to compile.
377        #[expect(clippy::manual_async_fn)]
378        fn iter(
379            &self,
380            key_range: TableKeyRange,
381
382            read_options: ReadOptions,
383        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
384            async move {
385                let actual = self
386                    .actual
387                    .iter(key_range.clone(), read_options.clone())
388                    .await?;
389                let expected = if let Some(expected) = &self.expected {
390                    Some(expected.iter(key_range, read_options).await?)
391                } else {
392                    None
393                };
394
395                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
396            }
397        }
398
399        #[expect(clippy::manual_async_fn)]
400        fn rev_iter(
401            &self,
402            key_range: TableKeyRange,
403
404            read_options: ReadOptions,
405        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
406            async move {
407                let actual = self
408                    .actual
409                    .rev_iter(key_range.clone(), read_options.clone())
410                    .await?;
411                let expected = if let Some(expected) = &self.expected {
412                    Some(expected.rev_iter(key_range, read_options).await?)
413                } else {
414                    None
415                };
416
417                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
418            }
419        }
420    }
421
422    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
423        type ChangeLogIter = impl StateStoreReadChangeLogIter;
424
425        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
426            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
427            if let Some(expected) = &self.expected {
428                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
429            }
430            Ok(actual)
431        }
432
433        async fn iter_log(
434            &self,
435            epoch_range: (u64, u64),
436            key_range: TableKeyRange,
437            options: ReadLogOptions,
438        ) -> StorageResult<Self::ChangeLogIter> {
439            let actual = self
440                .actual
441                .iter_log(epoch_range, key_range.clone(), options.clone())
442                .await?;
443            let expected = if let Some(expected) = &self.expected {
444                Some(expected.iter_log(epoch_range, key_range, options).await?)
445            } else {
446                None
447            };
448
449            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
450        }
451    }
452
453    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
454        for VerifyStateStore<A, E, T>
455    where
456        for<'a> T::ItemRef<'a>: PartialEq + Debug,
457    {
458        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
459            let actual = self.actual.try_next().await?;
460            if let Some(expected) = self.expected.as_mut() {
461                let expected = expected.try_next().await?;
462                assert_eq!(actual, expected);
463            }
464            Ok(actual)
465        }
466    }
467
468    fn verify_iter<T: IterItem>(
469        actual: impl StateStoreIter<T>,
470        expected: Option<impl StateStoreIter<T>>,
471    ) -> impl StateStoreIter<T>
472    where
473        for<'a> T::ItemRef<'a>: PartialEq + Debug,
474    {
475        VerifyStateStore {
476            actual,
477            expected,
478            _phantom: PhantomData::<T>,
479        }
480    }
481
482    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
483        type FlushedSnapshotReader =
484            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
485
486        type Iter<'a> = impl StateStoreIter + 'a;
487        type RevIter<'a> = impl StateStoreIter + 'a;
488
489        #[expect(clippy::manual_async_fn)]
490        fn iter(
491            &self,
492            key_range: TableKeyRange,
493            read_options: ReadOptions,
494        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
495            async move {
496                let actual = self
497                    .actual
498                    .iter(key_range.clone(), read_options.clone())
499                    .await?;
500                let expected = if let Some(expected) = &self.expected {
501                    Some(expected.iter(key_range, read_options).await?)
502                } else {
503                    None
504                };
505
506                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
507            }
508        }
509
510        #[expect(clippy::manual_async_fn)]
511        fn rev_iter(
512            &self,
513            key_range: TableKeyRange,
514            read_options: ReadOptions,
515        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
516            async move {
517                let actual = self
518                    .actual
519                    .rev_iter(key_range.clone(), read_options.clone())
520                    .await?;
521                let expected = if let Some(expected) = &self.expected {
522                    Some(expected.rev_iter(key_range, read_options).await?)
523                } else {
524                    None
525                };
526
527                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
528            }
529        }
530
531        fn insert(
532            &mut self,
533            key: TableKey<Bytes>,
534            new_val: Bytes,
535            old_val: Option<Bytes>,
536        ) -> StorageResult<()> {
537            if let Some(expected) = &mut self.expected {
538                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
539            }
540            self.actual.insert(key, new_val, old_val)?;
541
542            Ok(())
543        }
544
545        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
546            if let Some(expected) = &mut self.expected {
547                expected.delete(key.clone(), old_val.clone())?;
548            }
549            self.actual.delete(key, old_val)?;
550            Ok(())
551        }
552
553        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
554            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
555            if let Some(expected) = &mut self.expected {
556                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
557            }
558            Ok(ret)
559        }
560
561        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
562            let ret = self.actual.get_table_watermark(vnode);
563            if let Some(expected) = &self.expected {
564                assert_eq!(ret, expected.get_table_watermark(vnode));
565            }
566            ret
567        }
568
569        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
570            VerifyStateStore {
571                actual: self.actual.new_flushed_snapshot_reader(),
572                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
573                _phantom: Default::default(),
574            }
575        }
576    }
577
578    impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
579        for VerifyStateStore<A, E>
580    {
581        async fn flush(&mut self) -> StorageResult<usize> {
582            if let Some(expected) = &mut self.expected {
583                expected.flush().await?;
584            }
585            self.actual.flush().await
586        }
587
588        async fn try_flush(&mut self) -> StorageResult<()> {
589            if let Some(expected) = &mut self.expected {
590                expected.try_flush().await?;
591            }
592            self.actual.try_flush().await
593        }
594
595        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
596            self.actual.init(options.clone()).await?;
597            if let Some(expected) = &mut self.expected {
598                expected.init(options).await?;
599            }
600            Ok(())
601        }
602
603        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
604            if let Some(expected) = &mut self.expected {
605                expected.seal_current_epoch(next_epoch, opts.clone());
606            }
607            self.actual.seal_current_epoch(next_epoch, opts);
608        }
609    }
610
611    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
612        type Local = VerifyStateStore<A::Local, E::Local>;
613        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
614        type VectorWriter = A::VectorWriter;
615
616        fn try_wait_epoch(
617            &self,
618            epoch: HummockReadEpoch,
619            options: TryWaitEpochOptions,
620        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
621            self.actual.try_wait_epoch(epoch, options)
622        }
623
624        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
625            let expected = if let Some(expected) = &self.expected {
626                Some(expected.new_local(option.clone()).await)
627            } else {
628                None
629            };
630            VerifyStateStore {
631                actual: self.actual.new_local(option).await,
632                expected,
633                _phantom: PhantomData::<()>,
634            }
635        }
636
637        async fn new_read_snapshot(
638            &self,
639            epoch: HummockReadEpoch,
640            options: NewReadSnapshotOptions,
641        ) -> StorageResult<Self::ReadSnapshot> {
642            let expected = if let Some(expected) = &self.expected {
643                Some(expected.new_read_snapshot(epoch, options).await?)
644            } else {
645                None
646            };
647            Ok(VerifyStateStore {
648                actual: self.actual.new_read_snapshot(epoch, options).await?,
649                expected,
650                _phantom: PhantomData::<()>,
651            })
652        }
653
654        fn new_vector_writer(
655            &self,
656            options: NewVectorWriterOptions,
657        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
658            self.actual.new_vector_writer(options)
659        }
660    }
661
662    impl<A, E> Deref for VerifyStateStore<A, E> {
663        type Target = A;
664
665        fn deref(&self) -> &Self::Target {
666            &self.actual
667        }
668    }
669}
670
671impl StateStoreImpl {
672    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
673    #[allow(clippy::too_many_arguments)]
674    #[expect(clippy::borrowed_box)]
675    pub async fn new(
676        s: &str,
677        opts: Arc<StorageOpts>,
678        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
679        state_store_metrics: Arc<HummockStateStoreMetrics>,
680        object_store_metrics: Arc<ObjectStoreMetrics>,
681        storage_metrics: Arc<MonitoredStorageMetrics>,
682        compactor_metrics: Arc<CompactorMetrics>,
683        await_tree_config: Option<await_tree::Config>,
684        use_new_object_prefix_strategy: bool,
685    ) -> StorageResult<Self> {
686        const KB: usize = 1 << 10;
687        const MB: usize = 1 << 20;
688
689        let meta_cache = {
690            let mut builder = HybridCacheBuilder::new()
691                .with_name("foyer.meta")
692                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
693                .memory(opts.meta_cache_capacity_mb * MB)
694                .with_shards(opts.meta_cache_shard_num)
695                .with_eviction_config(opts.meta_cache_eviction_config.clone())
696                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
697                    u64::BITS as usize / 8 + value.estimate_size()
698                })
699                .storage(Engine::Large(
700                    LargeEngineOptions::new()
701                        .with_indexer_shards(opts.meta_file_cache_indexer_shards)
702                        .with_flushers(opts.meta_file_cache_flushers)
703                        .with_reclaimers(opts.meta_file_cache_reclaimers)
704                        .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
705                        .with_clean_region_threshold(
706                            opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
707                        )
708                        .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
709                        .with_blob_index_size(16 * KB)
710                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
711                            opts.meta_file_cache_fifo_probation_ratio,
712                        ))]),
713                ));
714
715            if !opts.meta_file_cache_dir.is_empty() {
716                if let Err(e) = Feature::ElasticDiskCache.check_available() {
717                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
718                } else {
719                    builder = builder
720                        .with_device_options(
721                            DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
722                                .with_capacity(opts.meta_file_cache_capacity_mb * MB)
723                                .with_file_size(opts.meta_file_cache_file_capacity_mb * MB)
724                                .with_throttle(opts.meta_file_cache_throttle.clone()),
725                        )
726                        .with_recover_mode(opts.meta_file_cache_recover_mode)
727                        .with_compression(opts.meta_file_cache_compression)
728                        .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
729                }
730            }
731
732            builder.build().await.map_err(HummockError::foyer_error)?
733        };
734
735        let block_cache = {
736            let mut builder = HybridCacheBuilder::new()
737                .with_name("foyer.data")
738                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
739                .with_event_listener(Arc::new(BlockCacheEventListener::new(
740                    state_store_metrics.clone(),
741                )))
742                .memory(opts.block_cache_capacity_mb * MB)
743                .with_shards(opts.block_cache_shard_num)
744                .with_eviction_config(opts.block_cache_eviction_config.clone())
745                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
746                    // FIXME(MrCroxx): Calculate block weight more accurately.
747                    u64::BITS as usize * 2 / 8 + value.raw().len()
748                })
749                .storage(Engine::Large(
750                    LargeEngineOptions::new()
751                        .with_indexer_shards(opts.data_file_cache_indexer_shards)
752                        .with_flushers(opts.data_file_cache_flushers)
753                        .with_reclaimers(opts.data_file_cache_reclaimers)
754                        .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
755                        .with_clean_region_threshold(
756                            opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
757                        )
758                        .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
759                        .with_blob_index_size(16 * KB)
760                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
761                            opts.data_file_cache_fifo_probation_ratio,
762                        ))]),
763                ));
764
765            if !opts.data_file_cache_dir.is_empty() {
766                if let Err(e) = Feature::ElasticDiskCache.check_available() {
767                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
768                } else {
769                    builder = builder
770                        .with_device_options(
771                            DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
772                                .with_capacity(opts.data_file_cache_capacity_mb * MB)
773                                .with_file_size(opts.data_file_cache_file_capacity_mb * MB)
774                                .with_throttle(opts.data_file_cache_throttle.clone()),
775                        )
776                        .with_recover_mode(opts.data_file_cache_recover_mode)
777                        .with_compression(opts.data_file_cache_compression)
778                        .with_runtime_options(opts.data_file_cache_runtime_config.clone());
779                }
780            }
781
782            builder.build().await.map_err(HummockError::foyer_error)?
783        };
784
785        let recent_filter = if opts.data_file_cache_dir.is_empty() {
786            None
787        } else {
788            Some(Arc::new(RecentFilter::new(
789                opts.cache_refill_recent_filter_layers,
790                Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
791            )))
792        };
793
794        let store = match s {
795            hummock if hummock.starts_with("hummock+") => {
796                let object_store = build_remote_object_store(
797                    hummock.strip_prefix("hummock+").unwrap(),
798                    object_store_metrics.clone(),
799                    "Hummock",
800                    Arc::new(opts.object_store_config.clone()),
801                )
802                .await;
803
804                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
805                    store: Arc::new(object_store),
806                    path: opts.data_directory.clone(),
807                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
808                    max_prefetch_block_number: opts.max_prefetch_block_number,
809                    recent_filter,
810                    state_store_metrics: state_store_metrics.clone(),
811                    use_new_object_prefix_strategy,
812
813                    meta_cache,
814                    block_cache,
815                }));
816                let notification_client =
817                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
818                let compaction_catalog_manager_ref =
819                    Arc::new(CompactionCatalogManager::new(Box::new(
820                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
821                    )));
822
823                let inner = HummockStorage::new(
824                    opts.clone(),
825                    sstable_store,
826                    hummock_meta_client.clone(),
827                    notification_client,
828                    compaction_catalog_manager_ref,
829                    state_store_metrics.clone(),
830                    compactor_metrics.clone(),
831                    await_tree_config,
832                )
833                .await?;
834
835                StateStoreImpl::hummock(inner, storage_metrics)
836            }
837
838            "in_memory" | "in-memory" => {
839                tracing::warn!(
840                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
841                );
842                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
843            }
844
845            sled if sled.starts_with("sled://") => {
846                tracing::warn!(
847                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
848                );
849                let path = sled.strip_prefix("sled://").unwrap();
850                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
851            }
852
853            other => unimplemented!("{} state store is not supported", other),
854        };
855
856        Ok(store)
857    }
858}
859
860pub trait AsHummock: Send + Sync {
861    fn as_hummock(&self) -> Option<&HummockStorage>;
862
863    fn sync(
864        &self,
865        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
866    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
867        async move {
868            if let Some(hummock) = self.as_hummock() {
869                hummock.sync(sync_table_epochs).await
870            } else {
871                Ok(SyncResult::default())
872            }
873        }
874        .boxed()
875    }
876}
877
878impl AsHummock for HummockStorage {
879    fn as_hummock(&self) -> Option<&HummockStorage> {
880        Some(self)
881    }
882}
883
884impl AsHummock for MemoryStateStore {
885    fn as_hummock(&self) -> Option<&HummockStorage> {
886        None
887    }
888}
889
890impl AsHummock for SledStateStore {
891    fn as_hummock(&self) -> Option<&HummockStorage> {
892        None
893    }
894}
895
896#[cfg(debug_assertions)]
897mod dyn_state_store {
898    use std::future::Future;
899    use std::ops::DerefMut;
900    use std::sync::Arc;
901
902    use bytes::Bytes;
903    use risingwave_common::bitmap::Bitmap;
904    use risingwave_common::hash::VirtualNode;
905    use risingwave_hummock_sdk::HummockReadEpoch;
906    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
907
908    use crate::error::StorageResult;
909    use crate::hummock::HummockStorage;
910    use crate::store::*;
911    use crate::store_impl::AsHummock;
912    use crate::vector::VectorDistance;
913
914    #[async_trait::async_trait]
915    pub trait DynStateStoreIter<T: IterItem>: Send {
916        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
917    }
918
919    #[async_trait::async_trait]
920    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
921        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
922            self.try_next().await
923        }
924    }
925
926    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
927    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
928        fn try_next(
929            &mut self,
930        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
931            self.deref_mut().try_next()
932        }
933    }
934
935    // For StateStoreRead
936
937    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
938    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
939
940    #[async_trait::async_trait]
941    pub trait DynStateStoreGet: StaticSendSync {
942        async fn get_keyed_row(
943            &self,
944            key: TableKey<Bytes>,
945            read_options: ReadOptions,
946        ) -> StorageResult<Option<StateStoreKeyedRow>>;
947    }
948
949    #[async_trait::async_trait]
950    pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
951        async fn iter(
952            &self,
953            key_range: TableKeyRange,
954
955            read_options: ReadOptions,
956        ) -> StorageResult<BoxStateStoreReadIter>;
957
958        async fn rev_iter(
959            &self,
960            key_range: TableKeyRange,
961
962            read_options: ReadOptions,
963        ) -> StorageResult<BoxStateStoreReadIter>;
964    }
965
966    #[async_trait::async_trait]
967    pub trait DynStateStoreReadLog: StaticSendSync {
968        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
969        async fn iter_log(
970            &self,
971            epoch_range: (u64, u64),
972            key_range: TableKeyRange,
973            options: ReadLogOptions,
974        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
975    }
976
977    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
978
979    #[async_trait::async_trait]
980    impl<S: StateStoreGet> DynStateStoreGet for S {
981        async fn get_keyed_row(
982            &self,
983            key: TableKey<Bytes>,
984            read_options: ReadOptions,
985        ) -> StorageResult<Option<StateStoreKeyedRow>> {
986            self.on_key_value(key, read_options, move |key, value| {
987                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
988            })
989            .await
990        }
991    }
992
993    #[async_trait::async_trait]
994    impl<S: StateStoreRead> DynStateStoreRead for S {
995        async fn iter(
996            &self,
997            key_range: TableKeyRange,
998
999            read_options: ReadOptions,
1000        ) -> StorageResult<BoxStateStoreReadIter> {
1001            Ok(Box::new(self.iter(key_range, read_options).await?))
1002        }
1003
1004        async fn rev_iter(
1005            &self,
1006            key_range: TableKeyRange,
1007
1008            read_options: ReadOptions,
1009        ) -> StorageResult<BoxStateStoreReadIter> {
1010            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1011        }
1012    }
1013
1014    #[async_trait::async_trait]
1015    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1016        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1017            self.next_epoch(epoch, options).await
1018        }
1019
1020        async fn iter_log(
1021            &self,
1022            epoch_range: (u64, u64),
1023            key_range: TableKeyRange,
1024            options: ReadLogOptions,
1025        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1026            Ok(Box::new(
1027                self.iter_log(epoch_range, key_range, options).await?,
1028            ))
1029        }
1030    }
1031
1032    // For LocalStateStore
1033    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1034    #[async_trait::async_trait]
1035    pub trait DynLocalStateStore:
1036        DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1037    {
1038        async fn iter(
1039            &self,
1040            key_range: TableKeyRange,
1041            read_options: ReadOptions,
1042        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1043
1044        async fn rev_iter(
1045            &self,
1046            key_range: TableKeyRange,
1047            read_options: ReadOptions,
1048        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1049
1050        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1051
1052        fn insert(
1053            &mut self,
1054            key: TableKey<Bytes>,
1055            new_val: Bytes,
1056            old_val: Option<Bytes>,
1057        ) -> StorageResult<()>;
1058
1059        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1060
1061        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1062
1063        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1064    }
1065
1066    #[async_trait::async_trait]
1067    pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1068        async fn flush(&mut self) -> StorageResult<usize>;
1069
1070        async fn try_flush(&mut self) -> StorageResult<()>;
1071
1072        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1073
1074        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1075    }
1076
1077    #[async_trait::async_trait]
1078    impl<S: LocalStateStore> DynLocalStateStore for S {
1079        async fn iter(
1080            &self,
1081            key_range: TableKeyRange,
1082            read_options: ReadOptions,
1083        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1084            Ok(Box::new(self.iter(key_range, read_options).await?))
1085        }
1086
1087        async fn rev_iter(
1088            &self,
1089            key_range: TableKeyRange,
1090            read_options: ReadOptions,
1091        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1092            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1093        }
1094
1095        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1096            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1097        }
1098
1099        fn insert(
1100            &mut self,
1101            key: TableKey<Bytes>,
1102            new_val: Bytes,
1103            old_val: Option<Bytes>,
1104        ) -> StorageResult<()> {
1105            self.insert(key, new_val, old_val)
1106        }
1107
1108        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1109            self.delete(key, old_val)
1110        }
1111
1112        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1113            self.update_vnode_bitmap(vnodes).await
1114        }
1115
1116        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1117            self.get_table_watermark(vnode)
1118        }
1119    }
1120
1121    #[async_trait::async_trait]
1122    impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1123        async fn flush(&mut self) -> StorageResult<usize> {
1124            self.flush().await
1125        }
1126
1127        async fn try_flush(&mut self) -> StorageResult<()> {
1128            self.try_flush().await
1129        }
1130
1131        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1132            self.init(options).await
1133        }
1134
1135        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1136            self.seal_current_epoch(next_epoch, opts)
1137        }
1138    }
1139
1140    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1141
1142    impl LocalStateStore for BoxDynLocalStateStore {
1143        type FlushedSnapshotReader = StateStoreReadDynRef;
1144        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1145        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1146
1147        fn iter(
1148            &self,
1149            key_range: TableKeyRange,
1150            read_options: ReadOptions,
1151        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1152            (*self.0).iter(key_range, read_options)
1153        }
1154
1155        fn rev_iter(
1156            &self,
1157            key_range: TableKeyRange,
1158            read_options: ReadOptions,
1159        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1160            (*self.0).rev_iter(key_range, read_options)
1161        }
1162
1163        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1164            (*self.0).new_flushed_snapshot_reader()
1165        }
1166
1167        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1168            (*self.0).get_table_watermark(vnode)
1169        }
1170
1171        fn insert(
1172            &mut self,
1173            key: TableKey<Bytes>,
1174            new_val: Bytes,
1175            old_val: Option<Bytes>,
1176        ) -> StorageResult<()> {
1177            (*self.0).insert(key, new_val, old_val)
1178        }
1179
1180        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1181            (*self.0).delete(key, old_val)
1182        }
1183
1184        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1185            (*self.0).update_vnode_bitmap(vnodes).await
1186        }
1187    }
1188
1189    impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1190    where
1191        StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1192    {
1193        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1194            self.as_mut().flush()
1195        }
1196
1197        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1198            self.as_mut().try_flush()
1199        }
1200
1201        fn init(
1202            &mut self,
1203            options: InitOptions,
1204        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1205            self.as_mut().init(options)
1206        }
1207
1208        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1209            self.as_mut().seal_current_epoch(next_epoch, opts)
1210        }
1211    }
1212
1213    #[async_trait::async_trait]
1214    pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1215        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1216    }
1217
1218    #[async_trait::async_trait]
1219    impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1220        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1221            self.insert(vec, info)
1222        }
1223    }
1224
1225    pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1226
1227    impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1228        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1229            self.0.insert(vec, info)
1230        }
1231    }
1232
1233    // For global StateStore
1234
1235    #[async_trait::async_trait]
1236    pub trait DynStateStoreReadVector: StaticSendSync {
1237        async fn nearest(
1238            &self,
1239            vec: Vector,
1240            options: VectorNearestOptions,
1241        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1242    }
1243
1244    #[async_trait::async_trait]
1245    impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1246        async fn nearest(
1247            &self,
1248            vec: Vector,
1249            options: VectorNearestOptions,
1250        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1251            self.nearest(vec, options, |vec, distance, info| {
1252                (
1253                    Vector::clone_from_ref(vec),
1254                    distance,
1255                    Bytes::copy_from_slice(info),
1256                )
1257            })
1258            .await
1259        }
1260    }
1261
1262    impl<P> StateStoreReadVector for StateStorePointer<P>
1263    where
1264        StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1265    {
1266        async fn nearest<O: Send + 'static>(
1267            &self,
1268            vec: Vector,
1269            options: VectorNearestOptions,
1270            on_nearest_item_fn: impl OnNearestItemFn<O>,
1271        ) -> StorageResult<Vec<O>> {
1272            let output = self.as_ref().nearest(vec, options).await?;
1273            Ok(output
1274                .into_iter()
1275                .map(|(vec, distance, info)| {
1276                    on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1277                })
1278                .collect())
1279        }
1280    }
1281
1282    pub trait DynStateStoreReadSnapshot:
1283        DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1284    {
1285    }
1286
1287    impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1288        for S
1289    {
1290    }
1291
1292    pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1293    #[async_trait::async_trait]
1294    pub trait DynStateStoreExt: StaticSendSync {
1295        async fn try_wait_epoch(
1296            &self,
1297            epoch: HummockReadEpoch,
1298            options: TryWaitEpochOptions,
1299        ) -> StorageResult<()>;
1300
1301        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1302        async fn new_read_snapshot(
1303            &self,
1304            epoch: HummockReadEpoch,
1305            options: NewReadSnapshotOptions,
1306        ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1307        async fn new_vector_writer(
1308            &self,
1309            options: NewVectorWriterOptions,
1310        ) -> BoxDynStateStoreWriteVector;
1311    }
1312
1313    #[async_trait::async_trait]
1314    impl<S: StateStore> DynStateStoreExt for S {
1315        async fn try_wait_epoch(
1316            &self,
1317            epoch: HummockReadEpoch,
1318            options: TryWaitEpochOptions,
1319        ) -> StorageResult<()> {
1320            self.try_wait_epoch(epoch, options).await
1321        }
1322
1323        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1324            StateStorePointer(Box::new(self.new_local(option).await))
1325        }
1326
1327        async fn new_read_snapshot(
1328            &self,
1329            epoch: HummockReadEpoch,
1330            options: NewReadSnapshotOptions,
1331        ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1332            Ok(StateStorePointer(Arc::new(
1333                self.new_read_snapshot(epoch, options).await?,
1334            )))
1335        }
1336
1337        async fn new_vector_writer(
1338            &self,
1339            options: NewVectorWriterOptions,
1340        ) -> BoxDynStateStoreWriteVector {
1341            StateStorePointer(Box::new(self.new_vector_writer(options).await))
1342        }
1343    }
1344
1345    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1346
1347    macro_rules! state_store_pointer_dyn_as_ref {
1348        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1349            impl AsRef<dyn $target_dyn_trait>
1350                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1351            {
1352                fn as_ref(&self) -> &dyn $target_dyn_trait {
1353                    (&*self.0) as _
1354                }
1355            }
1356        };
1357    }
1358
1359    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1360    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1361    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1362    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1363    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1364    state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1365
1366    macro_rules! state_store_pointer_dyn_as_mut {
1367        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1368            impl AsMut<dyn $target_dyn_trait>
1369                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1370            {
1371                fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1372                    (&mut *self.0) as _
1373                }
1374            }
1375        };
1376    }
1377
1378    state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1379    state_store_pointer_dyn_as_mut!(
1380        Box<dyn DynStateStoreWriteVector>,
1381        DynStateStoreWriteEpochControl
1382    );
1383
1384    #[derive(Clone)]
1385    pub struct StateStorePointer<P>(pub(crate) P);
1386
1387    impl<P> StateStoreGet for StateStorePointer<P>
1388    where
1389        StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1390    {
1391        async fn on_key_value<O: Send + 'static>(
1392            &self,
1393            key: TableKey<Bytes>,
1394            read_options: ReadOptions,
1395            on_key_value_fn: impl KeyValueFn<O>,
1396        ) -> StorageResult<Option<O>> {
1397            let option = self.as_ref().get_keyed_row(key, read_options).await?;
1398            option
1399                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1400                .transpose()
1401        }
1402    }
1403
1404    impl<P> StateStoreRead for StateStorePointer<P>
1405    where
1406        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1407    {
1408        type Iter = BoxStateStoreReadIter;
1409        type RevIter = BoxStateStoreReadIter;
1410
1411        fn iter(
1412            &self,
1413            key_range: TableKeyRange,
1414
1415            read_options: ReadOptions,
1416        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1417            self.as_ref().iter(key_range, read_options)
1418        }
1419
1420        fn rev_iter(
1421            &self,
1422            key_range: TableKeyRange,
1423
1424            read_options: ReadOptions,
1425        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1426            self.as_ref().rev_iter(key_range, read_options)
1427        }
1428    }
1429
1430    impl StateStoreReadLog for StateStoreDynRef {
1431        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1432
1433        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1434            (*self.0).next_epoch(epoch, options).await
1435        }
1436
1437        fn iter_log(
1438            &self,
1439            epoch_range: (u64, u64),
1440            key_range: TableKeyRange,
1441            options: ReadLogOptions,
1442        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1443            (*self.0).iter_log(epoch_range, key_range, options)
1444        }
1445    }
1446
1447    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1448
1449    impl AsHummock for StateStoreDynRef {
1450        fn as_hummock(&self) -> Option<&HummockStorage> {
1451            (*self.0).as_hummock()
1452        }
1453    }
1454
1455    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1456
1457    impl StateStore for StateStoreDynRef {
1458        type Local = BoxDynLocalStateStore;
1459        type ReadSnapshot = StateStoreReadSnapshotDynRef;
1460        type VectorWriter = BoxDynStateStoreWriteVector;
1461
1462        fn try_wait_epoch(
1463            &self,
1464            epoch: HummockReadEpoch,
1465            options: TryWaitEpochOptions,
1466        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1467            (*self.0).try_wait_epoch(epoch, options)
1468        }
1469
1470        fn new_local(
1471            &self,
1472            option: NewLocalOptions,
1473        ) -> impl Future<Output = Self::Local> + Send + '_ {
1474            (*self.0).new_local(option)
1475        }
1476
1477        async fn new_read_snapshot(
1478            &self,
1479            epoch: HummockReadEpoch,
1480            options: NewReadSnapshotOptions,
1481        ) -> StorageResult<Self::ReadSnapshot> {
1482            (*self.0).new_read_snapshot(epoch, options).await
1483        }
1484
1485        fn new_vector_writer(
1486            &self,
1487            options: NewVectorWriterOptions,
1488        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1489            (*self.0).new_vector_writer(options)
1490        }
1491    }
1492}