risingwave_storage/
store_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22    DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RateLimitPicker,
23};
24use futures::FutureExt;
25use futures::future::BoxFuture;
26use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
27use risingwave_common::catalog::TableId;
28use risingwave_common::license::Feature;
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common_service::RpcNotificationClient;
31use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
32use risingwave_object_store::object::build_remote_object_store;
33use thiserror_ext::AsReport;
34
35use crate::StateStore;
36use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
37use crate::error::StorageResult;
38use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
39use crate::hummock::{
40    Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
41    SstableBlockIndex, SstableStore, SstableStoreConfig,
42};
43use crate::memory::MemoryStateStore;
44use crate::memory::sled::SledStateStore;
45use crate::monitor::{
46    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
47    ObjectStoreMetrics,
48};
49use crate::opts::StorageOpts;
50
51static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
52    Box::new(PrometheusMetricsRegistry::new(
53        GLOBAL_METRICS_REGISTRY.clone(),
54    ))
55});
56
57mod opaque_type {
58    use super::*;
59
60    pub type HummockStorageType = impl StateStore + AsHummock;
61    pub type MemoryStateStoreType = impl StateStore + AsHummock;
62    pub type SledStateStoreType = impl StateStore + AsHummock;
63
64    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
65        may_dynamic_dispatch(state_store)
66    }
67
68    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
69        may_dynamic_dispatch(may_verify(state_store))
70    }
71
72    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
73        may_dynamic_dispatch(state_store)
74    }
75}
76pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
77use opaque_type::{hummock, in_memory, sled};
78
79#[cfg(feature = "hm-trace")]
80type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
81
82#[cfg(not(feature = "hm-trace"))]
83type Monitored<S> = MonitoredStateStore<S>;
84
85fn monitored<S: StateStore>(
86    state_store: S,
87    storage_metrics: Arc<MonitoredStorageMetrics>,
88) -> Monitored<S> {
89    let inner = {
90        #[cfg(feature = "hm-trace")]
91        {
92            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
93        }
94        #[cfg(not(feature = "hm-trace"))]
95        {
96            state_store
97        }
98    };
99    inner.monitored(storage_metrics)
100}
101
102fn inner<S>(state_store: &Monitored<S>) -> &S {
103    let inner = state_store.inner();
104    {
105        #[cfg(feature = "hm-trace")]
106        {
107            inner.inner()
108        }
109        #[cfg(not(feature = "hm-trace"))]
110        {
111            inner
112        }
113    }
114}
115
116/// The type erased [`StateStore`].
117#[derive(Clone, EnumAsInner)]
118#[allow(clippy::enum_variant_names)]
119pub enum StateStoreImpl {
120    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
121    /// `hummock` will be automatically recognized as Hummock state store.
122    ///
123    /// Example URLs:
124    ///
125    /// * `hummock+s3://bucket`
126    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
127    /// * `hummock+memory` (should only be used in 1 compute node mode)
128    HummockStateStore(Monitored<HummockStorageType>),
129    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
130    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
131    /// store misses some critical implementation to ensure the correctness of persisting streaming
132    /// state. (e.g., no `read_epoch` support, no async checkpoint)
133    MemoryStateStore(Monitored<MemoryStateStoreType>),
134    SledStateStore(Monitored<SledStateStoreType>),
135}
136
137fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
138    #[cfg(not(debug_assertions))]
139    {
140        state_store
141    }
142    #[cfg(debug_assertions)]
143    {
144        use crate::store_impl::dyn_state_store::StateStorePointer;
145        StateStorePointer(Arc::new(state_store) as _)
146    }
147}
148
149fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
150    #[cfg(not(debug_assertions))]
151    {
152        state_store
153    }
154    #[cfg(debug_assertions)]
155    {
156        use std::marker::PhantomData;
157
158        use risingwave_common::util::env_var::env_var_is_true;
159        use tracing::info;
160
161        use crate::store_impl::verify::VerifyStateStore;
162
163        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
164            info!("enable verify state store");
165            Some(SledStateStore::new_temp())
166        } else {
167            info!("verify state store is not enabled");
168            None
169        };
170        VerifyStateStore {
171            actual: state_store,
172            expected,
173            _phantom: PhantomData::<()>,
174        }
175    }
176}
177
178impl StateStoreImpl {
179    fn in_memory(
180        state_store: MemoryStateStore,
181        storage_metrics: Arc<MonitoredStorageMetrics>,
182    ) -> Self {
183        // The specific type of MemoryStateStoreType in deducted here.
184        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
185    }
186
187    pub fn hummock(
188        state_store: HummockStorage,
189        storage_metrics: Arc<MonitoredStorageMetrics>,
190    ) -> Self {
191        // The specific type of HummockStateStoreType in deducted here.
192        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
193    }
194
195    pub fn sled(
196        state_store: SledStateStore,
197        storage_metrics: Arc<MonitoredStorageMetrics>,
198    ) -> Self {
199        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
200    }
201
202    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
203        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
204    }
205
206    pub fn for_test() -> Self {
207        Self::in_memory(
208            MemoryStateStore::new(),
209            Arc::new(MonitoredStorageMetrics::unused()),
210        )
211    }
212
213    pub fn as_hummock(&self) -> Option<&HummockStorage> {
214        match self {
215            StateStoreImpl::HummockStateStore(hummock) => {
216                Some(inner(hummock).as_hummock().expect("should be hummock"))
217            }
218            _ => None,
219        }
220    }
221}
222
223impl Debug for StateStoreImpl {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        match self {
226            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
227            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
228            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
229        }
230    }
231}
232
233#[macro_export]
234macro_rules! dispatch_state_store {
235    ($impl:expr, $store:ident, $body:tt) => {{
236        use $crate::store_impl::StateStoreImpl;
237
238        match $impl {
239            StateStoreImpl::MemoryStateStore($store) => {
240                // WARNING: don't change this. Enabling memory backend will cause monomorphization
241                // explosion and thus slow compile time in release mode.
242                #[cfg(debug_assertions)]
243                {
244                    $body
245                }
246                #[cfg(not(debug_assertions))]
247                {
248                    let _store = $store;
249                    unimplemented!("memory state store should never be used in release mode");
250                }
251            }
252
253            StateStoreImpl::SledStateStore($store) => {
254                // WARNING: don't change this. Enabling memory backend will cause monomorphization
255                // explosion and thus slow compile time in release mode.
256                #[cfg(debug_assertions)]
257                {
258                    $body
259                }
260                #[cfg(not(debug_assertions))]
261                {
262                    let _store = $store;
263                    unimplemented!("sled state store should never be used in release mode");
264                }
265            }
266
267            StateStoreImpl::HummockStateStore($store) => $body,
268        }
269    }};
270}
271
272#[cfg(any(debug_assertions, test, feature = "test"))]
273pub mod verify {
274    use std::fmt::Debug;
275    use std::future::Future;
276    use std::marker::PhantomData;
277    use std::ops::Deref;
278    use std::sync::Arc;
279
280    use bytes::Bytes;
281    use risingwave_common::bitmap::Bitmap;
282    use risingwave_common::hash::VirtualNode;
283    use risingwave_hummock_sdk::HummockReadEpoch;
284    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
285    use tracing::log::warn;
286
287    use crate::error::StorageResult;
288    use crate::hummock::HummockStorage;
289    use crate::store::*;
290    use crate::store_impl::AsHummock;
291
292    fn assert_result_eq<Item: PartialEq + Debug, E>(
293        first: &std::result::Result<Item, E>,
294        second: &std::result::Result<Item, E>,
295    ) {
296        match (first, second) {
297            (Ok(first), Ok(second)) => {
298                if first != second {
299                    warn!("result different: {:?} {:?}", first, second);
300                }
301                assert_eq!(first, second);
302            }
303            (Err(_), Err(_)) => {}
304            _ => {
305                warn!("one success and one failed");
306                panic!("result not equal");
307            }
308        }
309    }
310
311    #[derive(Clone)]
312    pub struct VerifyStateStore<A, E, T = ()> {
313        pub actual: A,
314        pub expected: Option<E>,
315        pub _phantom: PhantomData<T>,
316    }
317
318    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
319        fn as_hummock(&self) -> Option<&HummockStorage> {
320            self.actual.as_hummock()
321        }
322    }
323
324    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
325        type Iter = impl StateStoreReadIter;
326        type RevIter = impl StateStoreReadIter;
327
328        async fn get_keyed_row(
329            &self,
330            key: TableKey<Bytes>,
331
332            read_options: ReadOptions,
333        ) -> StorageResult<Option<StateStoreKeyedRow>> {
334            let actual = self
335                .actual
336                .get_keyed_row(key.clone(), read_options.clone())
337                .await;
338            if let Some(expected) = &self.expected {
339                let expected = expected.get_keyed_row(key, read_options).await;
340                assert_result_eq(&actual, &expected);
341            }
342            actual
343        }
344
345        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
346        // fail to compile.
347        #[expect(clippy::manual_async_fn)]
348        fn iter(
349            &self,
350            key_range: TableKeyRange,
351
352            read_options: ReadOptions,
353        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
354            async move {
355                let actual = self
356                    .actual
357                    .iter(key_range.clone(), read_options.clone())
358                    .await?;
359                let expected = if let Some(expected) = &self.expected {
360                    Some(expected.iter(key_range, read_options).await?)
361                } else {
362                    None
363                };
364
365                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
366            }
367        }
368
369        #[expect(clippy::manual_async_fn)]
370        fn rev_iter(
371            &self,
372            key_range: TableKeyRange,
373
374            read_options: ReadOptions,
375        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
376            async move {
377                let actual = self
378                    .actual
379                    .rev_iter(key_range.clone(), read_options.clone())
380                    .await?;
381                let expected = if let Some(expected) = &self.expected {
382                    Some(expected.rev_iter(key_range, read_options).await?)
383                } else {
384                    None
385                };
386
387                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
388            }
389        }
390    }
391
392    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
393        type ChangeLogIter = impl StateStoreReadChangeLogIter;
394
395        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
396            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
397            if let Some(expected) = &self.expected {
398                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
399            }
400            Ok(actual)
401        }
402
403        async fn iter_log(
404            &self,
405            epoch_range: (u64, u64),
406            key_range: TableKeyRange,
407            options: ReadLogOptions,
408        ) -> StorageResult<Self::ChangeLogIter> {
409            let actual = self
410                .actual
411                .iter_log(epoch_range, key_range.clone(), options.clone())
412                .await?;
413            let expected = if let Some(expected) = &self.expected {
414                Some(expected.iter_log(epoch_range, key_range, options).await?)
415            } else {
416                None
417            };
418
419            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
420        }
421    }
422
423    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
424        for VerifyStateStore<A, E, T>
425    where
426        for<'a> T::ItemRef<'a>: PartialEq + Debug,
427    {
428        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
429            let actual = self.actual.try_next().await?;
430            if let Some(expected) = self.expected.as_mut() {
431                let expected = expected.try_next().await?;
432                assert_eq!(actual, expected);
433            }
434            Ok(actual)
435        }
436    }
437
438    fn verify_iter<T: IterItem>(
439        actual: impl StateStoreIter<T>,
440        expected: Option<impl StateStoreIter<T>>,
441    ) -> impl StateStoreIter<T>
442    where
443        for<'a> T::ItemRef<'a>: PartialEq + Debug,
444    {
445        VerifyStateStore {
446            actual,
447            expected,
448            _phantom: PhantomData::<T>,
449        }
450    }
451
452    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
453        type FlushedSnapshotReader =
454            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
455
456        type Iter<'a> = impl StateStoreIter + 'a;
457        type RevIter<'a> = impl StateStoreIter + 'a;
458
459        async fn get(
460            &self,
461            key: TableKey<Bytes>,
462            read_options: ReadOptions,
463        ) -> StorageResult<Option<Bytes>> {
464            let actual = self.actual.get(key.clone(), read_options.clone()).await;
465            if let Some(expected) = &self.expected {
466                let expected = expected.get(key, read_options).await;
467                assert_result_eq(&actual, &expected);
468            }
469            actual
470        }
471
472        #[expect(clippy::manual_async_fn)]
473        fn iter(
474            &self,
475            key_range: TableKeyRange,
476            read_options: ReadOptions,
477        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
478            async move {
479                let actual = self
480                    .actual
481                    .iter(key_range.clone(), read_options.clone())
482                    .await?;
483                let expected = if let Some(expected) = &self.expected {
484                    Some(expected.iter(key_range, read_options).await?)
485                } else {
486                    None
487                };
488
489                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
490            }
491        }
492
493        #[expect(clippy::manual_async_fn)]
494        fn rev_iter(
495            &self,
496            key_range: TableKeyRange,
497            read_options: ReadOptions,
498        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
499            async move {
500                let actual = self
501                    .actual
502                    .rev_iter(key_range.clone(), read_options.clone())
503                    .await?;
504                let expected = if let Some(expected) = &self.expected {
505                    Some(expected.rev_iter(key_range, read_options).await?)
506                } else {
507                    None
508                };
509
510                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
511            }
512        }
513
514        fn insert(
515            &mut self,
516            key: TableKey<Bytes>,
517            new_val: Bytes,
518            old_val: Option<Bytes>,
519        ) -> StorageResult<()> {
520            if let Some(expected) = &mut self.expected {
521                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
522            }
523            self.actual.insert(key, new_val, old_val)?;
524
525            Ok(())
526        }
527
528        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
529            if let Some(expected) = &mut self.expected {
530                expected.delete(key.clone(), old_val.clone())?;
531            }
532            self.actual.delete(key, old_val)?;
533            Ok(())
534        }
535
536        async fn flush(&mut self) -> StorageResult<usize> {
537            if let Some(expected) = &mut self.expected {
538                expected.flush().await?;
539            }
540            self.actual.flush().await
541        }
542
543        async fn try_flush(&mut self) -> StorageResult<()> {
544            if let Some(expected) = &mut self.expected {
545                expected.try_flush().await?;
546            }
547            self.actual.try_flush().await
548        }
549
550        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
551            self.actual.init(options.clone()).await?;
552            if let Some(expected) = &mut self.expected {
553                expected.init(options).await?;
554            }
555            Ok(())
556        }
557
558        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
559            if let Some(expected) = &mut self.expected {
560                expected.seal_current_epoch(next_epoch, opts.clone());
561            }
562            self.actual.seal_current_epoch(next_epoch, opts);
563        }
564
565        fn epoch(&self) -> u64 {
566            let epoch = self.actual.epoch();
567            if let Some(expected) = &self.expected {
568                assert_eq!(epoch, expected.epoch());
569            }
570            epoch
571        }
572
573        fn is_dirty(&self) -> bool {
574            let ret = self.actual.is_dirty();
575            if let Some(expected) = &self.expected {
576                assert_eq!(ret, expected.is_dirty());
577            }
578            ret
579        }
580
581        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
582            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
583            if let Some(expected) = &mut self.expected {
584                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
585            }
586            Ok(ret)
587        }
588
589        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
590            let ret = self.actual.get_table_watermark(vnode);
591            if let Some(expected) = &self.expected {
592                assert_eq!(ret, expected.get_table_watermark(vnode));
593            }
594            ret
595        }
596
597        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
598            VerifyStateStore {
599                actual: self.actual.new_flushed_snapshot_reader(),
600                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
601                _phantom: Default::default(),
602            }
603        }
604    }
605
606    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
607        type Local = VerifyStateStore<A::Local, E::Local>;
608        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
609
610        fn try_wait_epoch(
611            &self,
612            epoch: HummockReadEpoch,
613            options: TryWaitEpochOptions,
614        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
615            self.actual.try_wait_epoch(epoch, options)
616        }
617
618        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
619            let expected = if let Some(expected) = &self.expected {
620                Some(expected.new_local(option.clone()).await)
621            } else {
622                None
623            };
624            VerifyStateStore {
625                actual: self.actual.new_local(option).await,
626                expected,
627                _phantom: PhantomData::<()>,
628            }
629        }
630
631        async fn new_read_snapshot(
632            &self,
633            epoch: HummockReadEpoch,
634            options: NewReadSnapshotOptions,
635        ) -> StorageResult<Self::ReadSnapshot> {
636            let expected = if let Some(expected) = &self.expected {
637                Some(expected.new_read_snapshot(epoch, options).await?)
638            } else {
639                None
640            };
641            Ok(VerifyStateStore {
642                actual: self.actual.new_read_snapshot(epoch, options).await?,
643                expected,
644                _phantom: PhantomData::<()>,
645            })
646        }
647    }
648
649    impl<A, E> Deref for VerifyStateStore<A, E> {
650        type Target = A;
651
652        fn deref(&self) -> &Self::Target {
653            &self.actual
654        }
655    }
656}
657
658impl StateStoreImpl {
659    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
660    #[allow(clippy::too_many_arguments)]
661    #[expect(clippy::borrowed_box)]
662    pub async fn new(
663        s: &str,
664        opts: Arc<StorageOpts>,
665        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
666        state_store_metrics: Arc<HummockStateStoreMetrics>,
667        object_store_metrics: Arc<ObjectStoreMetrics>,
668        storage_metrics: Arc<MonitoredStorageMetrics>,
669        compactor_metrics: Arc<CompactorMetrics>,
670        await_tree_config: Option<await_tree::Config>,
671        use_new_object_prefix_strategy: bool,
672    ) -> StorageResult<Self> {
673        const KB: usize = 1 << 10;
674        const MB: usize = 1 << 20;
675
676        let meta_cache = {
677            let mut builder = HybridCacheBuilder::new()
678                .with_name("foyer.meta")
679                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
680                .memory(opts.meta_cache_capacity_mb * MB)
681                .with_shards(opts.meta_cache_shard_num)
682                .with_eviction_config(opts.meta_cache_eviction_config.clone())
683                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
684                    u64::BITS as usize / 8 + value.estimate_size()
685                })
686                .storage(Engine::Large);
687
688            if !opts.meta_file_cache_dir.is_empty() {
689                if let Err(e) = Feature::ElasticDiskCache.check_available() {
690                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
691                } else {
692                    builder = builder
693                        .with_device_options(
694                            DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
695                                .with_capacity(opts.meta_file_cache_capacity_mb * MB)
696                                .with_file_size(opts.meta_file_cache_file_capacity_mb * MB),
697                        )
698                        .with_recover_mode(opts.meta_file_cache_recover_mode)
699                        .with_compression(opts.meta_file_cache_compression)
700                        .with_runtime_options(opts.meta_file_cache_runtime_config.clone())
701                        .with_large_object_disk_cache_options(
702                            LargeEngineOptions::new()
703                                .with_indexer_shards(opts.meta_file_cache_indexer_shards)
704                                .with_flushers(opts.meta_file_cache_flushers)
705                                .with_reclaimers(opts.meta_file_cache_reclaimers)
706                                .with_buffer_pool_size(
707                                    opts.meta_file_cache_flush_buffer_threshold_mb * MB,
708                                ) // 128 MiB
709                                .with_clean_region_threshold(
710                                    opts.meta_file_cache_reclaimers
711                                        + opts.meta_file_cache_reclaimers / 2,
712                                )
713                                .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
714                                .with_blob_index_size(16 * KB),
715                        );
716                    if opts.meta_file_cache_insert_rate_limit_mb > 0 {
717                        builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(
718                            opts.meta_file_cache_insert_rate_limit_mb * MB,
719                        )));
720                    }
721                }
722            }
723
724            builder.build().await.map_err(HummockError::foyer_error)?
725        };
726
727        let block_cache = {
728            let mut builder = HybridCacheBuilder::new()
729                .with_name("foyer.data")
730                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
731                .with_event_listener(Arc::new(BlockCacheEventListener::new(
732                    state_store_metrics.clone(),
733                )))
734                .memory(opts.block_cache_capacity_mb * MB)
735                .with_shards(opts.block_cache_shard_num)
736                .with_eviction_config(opts.block_cache_eviction_config.clone())
737                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
738                    // FIXME(MrCroxx): Calculate block weight more accurately.
739                    u64::BITS as usize * 2 / 8 + value.raw().len()
740                })
741                .storage(Engine::Large);
742
743            if !opts.data_file_cache_dir.is_empty() {
744                if let Err(e) = Feature::ElasticDiskCache.check_available() {
745                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
746                } else {
747                    builder = builder
748                        .with_device_options(
749                            DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
750                                .with_capacity(opts.data_file_cache_capacity_mb * MB)
751                                .with_file_size(opts.data_file_cache_file_capacity_mb * MB),
752                        )
753                        .with_recover_mode(opts.data_file_cache_recover_mode)
754                        .with_compression(opts.data_file_cache_compression)
755                        .with_runtime_options(opts.data_file_cache_runtime_config.clone())
756                        .with_large_object_disk_cache_options(
757                            LargeEngineOptions::new()
758                                .with_indexer_shards(opts.data_file_cache_indexer_shards)
759                                .with_flushers(opts.data_file_cache_flushers)
760                                .with_reclaimers(opts.data_file_cache_reclaimers)
761                                .with_buffer_pool_size(
762                                    opts.data_file_cache_flush_buffer_threshold_mb * MB,
763                                ) // 128 MiB
764                                .with_clean_region_threshold(
765                                    opts.data_file_cache_reclaimers
766                                        + opts.data_file_cache_reclaimers / 2,
767                                )
768                                .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
769                                .with_blob_index_size(16 * KB),
770                        );
771                    if opts.data_file_cache_insert_rate_limit_mb > 0 {
772                        builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(
773                            opts.data_file_cache_insert_rate_limit_mb * MB,
774                        )));
775                    }
776                }
777            }
778
779            builder.build().await.map_err(HummockError::foyer_error)?
780        };
781
782        let recent_filter = if opts.data_file_cache_dir.is_empty() {
783            None
784        } else {
785            Some(Arc::new(RecentFilter::new(
786                opts.cache_refill_recent_filter_layers,
787                Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
788            )))
789        };
790
791        let store = match s {
792            hummock if hummock.starts_with("hummock+") => {
793                let object_store = build_remote_object_store(
794                    hummock.strip_prefix("hummock+").unwrap(),
795                    object_store_metrics.clone(),
796                    "Hummock",
797                    Arc::new(opts.object_store_config.clone()),
798                )
799                .await;
800
801                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
802                    store: Arc::new(object_store),
803                    path: opts.data_directory.clone(),
804                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
805                    max_prefetch_block_number: opts.max_prefetch_block_number,
806                    recent_filter,
807                    state_store_metrics: state_store_metrics.clone(),
808                    use_new_object_prefix_strategy,
809
810                    meta_cache,
811                    block_cache,
812                }));
813                let notification_client =
814                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
815                let compaction_catalog_manager_ref =
816                    Arc::new(CompactionCatalogManager::new(Box::new(
817                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
818                    )));
819
820                let inner = HummockStorage::new(
821                    opts.clone(),
822                    sstable_store,
823                    hummock_meta_client.clone(),
824                    notification_client,
825                    compaction_catalog_manager_ref,
826                    state_store_metrics.clone(),
827                    compactor_metrics.clone(),
828                    await_tree_config,
829                )
830                .await?;
831
832                StateStoreImpl::hummock(inner, storage_metrics)
833            }
834
835            "in_memory" | "in-memory" => {
836                tracing::warn!(
837                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
838                );
839                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
840            }
841
842            sled if sled.starts_with("sled://") => {
843                tracing::warn!(
844                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
845                );
846                let path = sled.strip_prefix("sled://").unwrap();
847                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
848            }
849
850            other => unimplemented!("{} state store is not supported", other),
851        };
852
853        Ok(store)
854    }
855}
856
857pub trait AsHummock: Send + Sync {
858    fn as_hummock(&self) -> Option<&HummockStorage>;
859
860    fn sync(
861        &self,
862        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
863    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
864        async move {
865            if let Some(hummock) = self.as_hummock() {
866                hummock.sync(sync_table_epochs).await
867            } else {
868                Ok(SyncResult::default())
869            }
870        }
871        .boxed()
872    }
873}
874
875impl AsHummock for HummockStorage {
876    fn as_hummock(&self) -> Option<&HummockStorage> {
877        Some(self)
878    }
879}
880
881impl AsHummock for MemoryStateStore {
882    fn as_hummock(&self) -> Option<&HummockStorage> {
883        None
884    }
885}
886
887impl AsHummock for SledStateStore {
888    fn as_hummock(&self) -> Option<&HummockStorage> {
889        None
890    }
891}
892
893#[cfg(debug_assertions)]
894mod dyn_state_store {
895    use std::future::Future;
896    use std::ops::DerefMut;
897    use std::sync::Arc;
898
899    use bytes::Bytes;
900    use risingwave_common::bitmap::Bitmap;
901    use risingwave_common::hash::VirtualNode;
902    use risingwave_hummock_sdk::HummockReadEpoch;
903    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
904
905    use crate::error::StorageResult;
906    use crate::hummock::HummockStorage;
907    use crate::store::*;
908    use crate::store_impl::AsHummock;
909
910    #[async_trait::async_trait]
911    pub trait DynStateStoreIter<T: IterItem>: Send {
912        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
913    }
914
915    #[async_trait::async_trait]
916    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
917        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
918            self.try_next().await
919        }
920    }
921
922    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
923    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
924        fn try_next(
925            &mut self,
926        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
927            self.deref_mut().try_next()
928        }
929    }
930
931    // For StateStoreRead
932
933    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
934    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
935
936    #[async_trait::async_trait]
937    pub trait DynStateStoreRead: StaticSendSync {
938        async fn get_keyed_row(
939            &self,
940            key: TableKey<Bytes>,
941
942            read_options: ReadOptions,
943        ) -> StorageResult<Option<StateStoreKeyedRow>>;
944
945        async fn iter(
946            &self,
947            key_range: TableKeyRange,
948
949            read_options: ReadOptions,
950        ) -> StorageResult<BoxStateStoreReadIter>;
951
952        async fn rev_iter(
953            &self,
954            key_range: TableKeyRange,
955
956            read_options: ReadOptions,
957        ) -> StorageResult<BoxStateStoreReadIter>;
958    }
959
960    #[async_trait::async_trait]
961    pub trait DynStateStoreReadLog: StaticSendSync {
962        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
963        async fn iter_log(
964            &self,
965            epoch_range: (u64, u64),
966            key_range: TableKeyRange,
967            options: ReadLogOptions,
968        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
969    }
970
971    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
972
973    #[async_trait::async_trait]
974    impl<S: StateStoreRead> DynStateStoreRead for S {
975        async fn get_keyed_row(
976            &self,
977            key: TableKey<Bytes>,
978
979            read_options: ReadOptions,
980        ) -> StorageResult<Option<StateStoreKeyedRow>> {
981            self.get_keyed_row(key, read_options).await
982        }
983
984        async fn iter(
985            &self,
986            key_range: TableKeyRange,
987
988            read_options: ReadOptions,
989        ) -> StorageResult<BoxStateStoreReadIter> {
990            Ok(Box::new(self.iter(key_range, read_options).await?))
991        }
992
993        async fn rev_iter(
994            &self,
995            key_range: TableKeyRange,
996
997            read_options: ReadOptions,
998        ) -> StorageResult<BoxStateStoreReadIter> {
999            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1000        }
1001    }
1002
1003    #[async_trait::async_trait]
1004    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1005        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1006            self.next_epoch(epoch, options).await
1007        }
1008
1009        async fn iter_log(
1010            &self,
1011            epoch_range: (u64, u64),
1012            key_range: TableKeyRange,
1013            options: ReadLogOptions,
1014        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1015            Ok(Box::new(
1016                self.iter_log(epoch_range, key_range, options).await?,
1017            ))
1018        }
1019    }
1020
1021    // For LocalStateStore
1022    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1023    #[async_trait::async_trait]
1024    pub trait DynLocalStateStore: StaticSendSync {
1025        async fn get(
1026            &self,
1027            key: TableKey<Bytes>,
1028            read_options: ReadOptions,
1029        ) -> StorageResult<Option<Bytes>>;
1030
1031        async fn iter(
1032            &self,
1033            key_range: TableKeyRange,
1034            read_options: ReadOptions,
1035        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1036
1037        async fn rev_iter(
1038            &self,
1039            key_range: TableKeyRange,
1040            read_options: ReadOptions,
1041        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1042
1043        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1044
1045        fn insert(
1046            &mut self,
1047            key: TableKey<Bytes>,
1048            new_val: Bytes,
1049            old_val: Option<Bytes>,
1050        ) -> StorageResult<()>;
1051
1052        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1053
1054        async fn flush(&mut self) -> StorageResult<usize>;
1055
1056        async fn try_flush(&mut self) -> StorageResult<()>;
1057
1058        fn epoch(&self) -> u64;
1059
1060        fn is_dirty(&self) -> bool;
1061
1062        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1063
1064        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1065
1066        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1067
1068        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1069    }
1070
1071    #[async_trait::async_trait]
1072    impl<S: LocalStateStore> DynLocalStateStore for S {
1073        async fn get(
1074            &self,
1075            key: TableKey<Bytes>,
1076            read_options: ReadOptions,
1077        ) -> StorageResult<Option<Bytes>> {
1078            self.get(key, read_options).await
1079        }
1080
1081        async fn iter(
1082            &self,
1083            key_range: TableKeyRange,
1084            read_options: ReadOptions,
1085        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1086            Ok(Box::new(self.iter(key_range, read_options).await?))
1087        }
1088
1089        async fn rev_iter(
1090            &self,
1091            key_range: TableKeyRange,
1092            read_options: ReadOptions,
1093        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1094            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1095        }
1096
1097        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1098            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1099        }
1100
1101        fn insert(
1102            &mut self,
1103            key: TableKey<Bytes>,
1104            new_val: Bytes,
1105            old_val: Option<Bytes>,
1106        ) -> StorageResult<()> {
1107            self.insert(key, new_val, old_val)
1108        }
1109
1110        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1111            self.delete(key, old_val)
1112        }
1113
1114        async fn flush(&mut self) -> StorageResult<usize> {
1115            self.flush().await
1116        }
1117
1118        async fn try_flush(&mut self) -> StorageResult<()> {
1119            self.try_flush().await
1120        }
1121
1122        fn epoch(&self) -> u64 {
1123            self.epoch()
1124        }
1125
1126        fn is_dirty(&self) -> bool {
1127            self.is_dirty()
1128        }
1129
1130        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1131            self.init(options).await
1132        }
1133
1134        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1135            self.seal_current_epoch(next_epoch, opts)
1136        }
1137
1138        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1139            self.update_vnode_bitmap(vnodes).await
1140        }
1141
1142        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1143            self.get_table_watermark(vnode)
1144        }
1145    }
1146
1147    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1148
1149    impl LocalStateStore for BoxDynLocalStateStore {
1150        type FlushedSnapshotReader = StateStoreReadDynRef;
1151        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1152        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1153
1154        fn get(
1155            &self,
1156            key: TableKey<Bytes>,
1157            read_options: ReadOptions,
1158        ) -> impl Future<Output = StorageResult<Option<Bytes>>> + Send + '_ {
1159            (*self.0).get(key, read_options)
1160        }
1161
1162        fn iter(
1163            &self,
1164            key_range: TableKeyRange,
1165            read_options: ReadOptions,
1166        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1167            (*self.0).iter(key_range, read_options)
1168        }
1169
1170        fn rev_iter(
1171            &self,
1172            key_range: TableKeyRange,
1173            read_options: ReadOptions,
1174        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1175            (*self.0).rev_iter(key_range, read_options)
1176        }
1177
1178        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1179            (*self.0).new_flushed_snapshot_reader()
1180        }
1181
1182        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1183            (*self.0).get_table_watermark(vnode)
1184        }
1185
1186        fn insert(
1187            &mut self,
1188            key: TableKey<Bytes>,
1189            new_val: Bytes,
1190            old_val: Option<Bytes>,
1191        ) -> StorageResult<()> {
1192            (*self.0).insert(key, new_val, old_val)
1193        }
1194
1195        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1196            (*self.0).delete(key, old_val)
1197        }
1198
1199        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1200            (*self.0).flush()
1201        }
1202
1203        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1204            (*self.0).try_flush()
1205        }
1206
1207        fn epoch(&self) -> u64 {
1208            (*self.0).epoch()
1209        }
1210
1211        fn is_dirty(&self) -> bool {
1212            (*self.0).is_dirty()
1213        }
1214
1215        fn init(
1216            &mut self,
1217            options: InitOptions,
1218        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1219            (*self.0).init(options)
1220        }
1221
1222        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1223            (*self.0).seal_current_epoch(next_epoch, opts)
1224        }
1225
1226        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1227            (*self.0).update_vnode_bitmap(vnodes).await
1228        }
1229    }
1230
1231    // For global StateStore
1232
1233    #[async_trait::async_trait]
1234    pub trait DynStateStoreExt: StaticSendSync {
1235        async fn try_wait_epoch(
1236            &self,
1237            epoch: HummockReadEpoch,
1238            options: TryWaitEpochOptions,
1239        ) -> StorageResult<()>;
1240
1241        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1242        async fn new_read_snapshot(
1243            &self,
1244            epoch: HummockReadEpoch,
1245            options: NewReadSnapshotOptions,
1246        ) -> StorageResult<StateStoreReadDynRef>;
1247    }
1248
1249    #[async_trait::async_trait]
1250    impl<S: StateStore> DynStateStoreExt for S {
1251        async fn try_wait_epoch(
1252            &self,
1253            epoch: HummockReadEpoch,
1254            options: TryWaitEpochOptions,
1255        ) -> StorageResult<()> {
1256            self.try_wait_epoch(epoch, options).await
1257        }
1258
1259        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1260            StateStorePointer(Box::new(self.new_local(option).await))
1261        }
1262
1263        async fn new_read_snapshot(
1264            &self,
1265            epoch: HummockReadEpoch,
1266            options: NewReadSnapshotOptions,
1267        ) -> StorageResult<StateStoreReadDynRef> {
1268            Ok(StateStorePointer(Arc::new(
1269                self.new_read_snapshot(epoch, options).await?,
1270            )))
1271        }
1272    }
1273
1274    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1275
1276    macro_rules! state_store_pointer_dyn_as_ref {
1277        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1278            impl AsRef<dyn $target_dyn_trait>
1279                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1280            {
1281                fn as_ref(&self) -> &dyn $target_dyn_trait {
1282                    (&*self.0) as _
1283                }
1284            }
1285        };
1286    }
1287
1288    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1289
1290    #[derive(Clone)]
1291    pub struct StateStorePointer<P>(pub(crate) P);
1292
1293    impl<P> StateStoreRead for StateStorePointer<P>
1294    where
1295        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StaticSendSync,
1296    {
1297        type Iter = BoxStateStoreReadIter;
1298        type RevIter = BoxStateStoreReadIter;
1299
1300        fn get_keyed_row(
1301            &self,
1302            key: TableKey<Bytes>,
1303
1304            read_options: ReadOptions,
1305        ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + Send + '_ {
1306            self.as_ref().get_keyed_row(key, read_options)
1307        }
1308
1309        fn iter(
1310            &self,
1311            key_range: TableKeyRange,
1312
1313            read_options: ReadOptions,
1314        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1315            self.as_ref().iter(key_range, read_options)
1316        }
1317
1318        fn rev_iter(
1319            &self,
1320            key_range: TableKeyRange,
1321
1322            read_options: ReadOptions,
1323        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1324            self.as_ref().rev_iter(key_range, read_options)
1325        }
1326    }
1327
1328    impl StateStoreReadLog for StateStoreDynRef {
1329        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1330
1331        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1332            (*self.0).next_epoch(epoch, options).await
1333        }
1334
1335        fn iter_log(
1336            &self,
1337            epoch_range: (u64, u64),
1338            key_range: TableKeyRange,
1339            options: ReadLogOptions,
1340        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1341            (*self.0).iter_log(epoch_range, key_range, options)
1342        }
1343    }
1344
1345    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1346
1347    impl AsHummock for StateStoreDynRef {
1348        fn as_hummock(&self) -> Option<&HummockStorage> {
1349            (*self.0).as_hummock()
1350        }
1351    }
1352
1353    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1354
1355    impl StateStore for StateStoreDynRef {
1356        type Local = BoxDynLocalStateStore;
1357        type ReadSnapshot = StateStoreReadDynRef;
1358
1359        fn try_wait_epoch(
1360            &self,
1361            epoch: HummockReadEpoch,
1362            options: TryWaitEpochOptions,
1363        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1364            (*self.0).try_wait_epoch(epoch, options)
1365        }
1366
1367        fn new_local(
1368            &self,
1369            option: NewLocalOptions,
1370        ) -> impl Future<Output = Self::Local> + Send + '_ {
1371            (*self.0).new_local(option)
1372        }
1373
1374        async fn new_read_snapshot(
1375            &self,
1376            epoch: HummockReadEpoch,
1377            options: NewReadSnapshotOptions,
1378        ) -> StorageResult<Self::ReadSnapshot> {
1379            (*self.0).new_read_snapshot(epoch, options).await
1380        }
1381    }
1382}