risingwave_storage/
store_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{DirectFsDeviceOptions, Engine, FifoPicker, HybridCacheBuilder, LargeEngineOptions};
22use futures::FutureExt;
23use futures::future::BoxFuture;
24use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
25use risingwave_common::catalog::TableId;
26use risingwave_common::license::Feature;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use risingwave_common_service::RpcNotificationClient;
29use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
30use risingwave_object_store::object::build_remote_object_store;
31use thiserror_ext::AsReport;
32
33use crate::StateStore;
34use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
35use crate::error::StorageResult;
36use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
37use crate::hummock::{
38    Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
39    SstableBlockIndex, SstableStore, SstableStoreConfig,
40};
41use crate::memory::MemoryStateStore;
42use crate::memory::sled::SledStateStore;
43use crate::monitor::{
44    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
45    ObjectStoreMetrics,
46};
47use crate::opts::StorageOpts;
48
49static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
50    Box::new(PrometheusMetricsRegistry::new(
51        GLOBAL_METRICS_REGISTRY.clone(),
52    ))
53});
54
55mod opaque_type {
56    use super::*;
57
58    pub type HummockStorageType = impl StateStore + AsHummock;
59    pub type MemoryStateStoreType = impl StateStore + AsHummock;
60    pub type SledStateStoreType = impl StateStore + AsHummock;
61
62    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
63        may_dynamic_dispatch(state_store)
64    }
65
66    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
67        may_dynamic_dispatch(may_verify(state_store))
68    }
69
70    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
71        may_dynamic_dispatch(state_store)
72    }
73}
74pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
75use opaque_type::{hummock, in_memory, sled};
76
77#[cfg(feature = "hm-trace")]
78type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
79
80#[cfg(not(feature = "hm-trace"))]
81type Monitored<S> = MonitoredStateStore<S>;
82
83fn monitored<S: StateStore>(
84    state_store: S,
85    storage_metrics: Arc<MonitoredStorageMetrics>,
86) -> Monitored<S> {
87    let inner = {
88        #[cfg(feature = "hm-trace")]
89        {
90            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
91        }
92        #[cfg(not(feature = "hm-trace"))]
93        {
94            state_store
95        }
96    };
97    inner.monitored(storage_metrics)
98}
99
100fn inner<S>(state_store: &Monitored<S>) -> &S {
101    let inner = state_store.inner();
102    {
103        #[cfg(feature = "hm-trace")]
104        {
105            inner.inner()
106        }
107        #[cfg(not(feature = "hm-trace"))]
108        {
109            inner
110        }
111    }
112}
113
114/// The type erased [`StateStore`].
115#[derive(Clone, EnumAsInner)]
116#[allow(clippy::enum_variant_names)]
117pub enum StateStoreImpl {
118    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
119    /// `hummock` will be automatically recognized as Hummock state store.
120    ///
121    /// Example URLs:
122    ///
123    /// * `hummock+s3://bucket`
124    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
125    /// * `hummock+memory` (should only be used in 1 compute node mode)
126    HummockStateStore(Monitored<HummockStorageType>),
127    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
128    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
129    /// store misses some critical implementation to ensure the correctness of persisting streaming
130    /// state. (e.g., no `read_epoch` support, no async checkpoint)
131    MemoryStateStore(Monitored<MemoryStateStoreType>),
132    SledStateStore(Monitored<SledStateStoreType>),
133}
134
135fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
136    #[cfg(not(debug_assertions))]
137    {
138        state_store
139    }
140    #[cfg(debug_assertions)]
141    {
142        use crate::store_impl::dyn_state_store::StateStorePointer;
143        StateStorePointer(Arc::new(state_store) as _)
144    }
145}
146
147fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
148    #[cfg(not(debug_assertions))]
149    {
150        state_store
151    }
152    #[cfg(debug_assertions)]
153    {
154        use std::marker::PhantomData;
155
156        use risingwave_common::util::env_var::env_var_is_true;
157        use tracing::info;
158
159        use crate::store_impl::verify::VerifyStateStore;
160
161        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
162            info!("enable verify state store");
163            Some(SledStateStore::new_temp())
164        } else {
165            info!("verify state store is not enabled");
166            None
167        };
168        VerifyStateStore {
169            actual: state_store,
170            expected,
171            _phantom: PhantomData::<()>,
172        }
173    }
174}
175
176impl StateStoreImpl {
177    fn in_memory(
178        state_store: MemoryStateStore,
179        storage_metrics: Arc<MonitoredStorageMetrics>,
180    ) -> Self {
181        // The specific type of MemoryStateStoreType in deducted here.
182        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
183    }
184
185    pub fn hummock(
186        state_store: HummockStorage,
187        storage_metrics: Arc<MonitoredStorageMetrics>,
188    ) -> Self {
189        // The specific type of HummockStateStoreType in deducted here.
190        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
191    }
192
193    pub fn sled(
194        state_store: SledStateStore,
195        storage_metrics: Arc<MonitoredStorageMetrics>,
196    ) -> Self {
197        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
198    }
199
200    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
201        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
202    }
203
204    pub fn for_test() -> Self {
205        Self::in_memory(
206            MemoryStateStore::new(),
207            Arc::new(MonitoredStorageMetrics::unused()),
208        )
209    }
210
211    pub fn as_hummock(&self) -> Option<&HummockStorage> {
212        match self {
213            StateStoreImpl::HummockStateStore(hummock) => {
214                Some(inner(hummock).as_hummock().expect("should be hummock"))
215            }
216            _ => None,
217        }
218    }
219}
220
221impl Debug for StateStoreImpl {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        match self {
224            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
225            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
226            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
227        }
228    }
229}
230
231#[macro_export]
232macro_rules! dispatch_state_store {
233    ($impl:expr, $store:ident, $body:tt) => {{
234        use $crate::store_impl::StateStoreImpl;
235
236        match $impl {
237            StateStoreImpl::MemoryStateStore($store) => {
238                // WARNING: don't change this. Enabling memory backend will cause monomorphization
239                // explosion and thus slow compile time in release mode.
240                #[cfg(debug_assertions)]
241                {
242                    $body
243                }
244                #[cfg(not(debug_assertions))]
245                {
246                    let _store = $store;
247                    unimplemented!("memory state store should never be used in release mode");
248                }
249            }
250
251            StateStoreImpl::SledStateStore($store) => {
252                // WARNING: don't change this. Enabling memory backend will cause monomorphization
253                // explosion and thus slow compile time in release mode.
254                #[cfg(debug_assertions)]
255                {
256                    $body
257                }
258                #[cfg(not(debug_assertions))]
259                {
260                    let _store = $store;
261                    unimplemented!("sled state store should never be used in release mode");
262                }
263            }
264
265            StateStoreImpl::HummockStateStore($store) => $body,
266        }
267    }};
268}
269
270#[cfg(any(debug_assertions, test, feature = "test"))]
271pub mod verify {
272    use std::fmt::Debug;
273    use std::future::Future;
274    use std::marker::PhantomData;
275    use std::ops::Deref;
276    use std::sync::Arc;
277
278    use bytes::Bytes;
279    use risingwave_common::bitmap::Bitmap;
280    use risingwave_common::hash::VirtualNode;
281    use risingwave_hummock_sdk::HummockReadEpoch;
282    use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
283    use tracing::log::warn;
284
285    use crate::error::StorageResult;
286    use crate::hummock::HummockStorage;
287    use crate::store::*;
288    use crate::store_impl::AsHummock;
289
290    #[expect(dead_code)]
291    fn assert_result_eq<Item: PartialEq + Debug, E>(
292        first: &std::result::Result<Item, E>,
293        second: &std::result::Result<Item, E>,
294    ) {
295        match (first, second) {
296            (Ok(first), Ok(second)) => {
297                if first != second {
298                    warn!("result different: {:?} {:?}", first, second);
299                }
300                assert_eq!(first, second);
301            }
302            (Err(_), Err(_)) => {}
303            _ => {
304                warn!("one success and one failed");
305                panic!("result not equal");
306            }
307        }
308    }
309
310    #[derive(Clone)]
311    pub struct VerifyStateStore<A, E, T = ()> {
312        pub actual: A,
313        pub expected: Option<E>,
314        pub _phantom: PhantomData<T>,
315    }
316
317    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
318        fn as_hummock(&self) -> Option<&HummockStorage> {
319            self.actual.as_hummock()
320        }
321    }
322
323    impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
324        async fn on_key_value<O: Send + 'static>(
325            &self,
326            key: TableKey<Bytes>,
327            read_options: ReadOptions,
328            on_key_value_fn: impl KeyValueFn<O>,
329        ) -> StorageResult<Option<O>> {
330            let actual: Option<(FullKey<Bytes>, Bytes)> = self
331                .actual
332                .on_key_value(key.clone(), read_options.clone(), |key, value| {
333                    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
334                })
335                .await?;
336            if let Some(expected) = &self.expected {
337                let expected: Option<(FullKey<Bytes>, Bytes)> = expected
338                    .on_key_value(key, read_options, |key, value| {
339                        Ok((key.copy_into(), Bytes::copy_from_slice(value)))
340                    })
341                    .await?;
342                assert_eq!(
343                    actual
344                        .as_ref()
345                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
346                    expected
347                        .as_ref()
348                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
349                );
350            }
351
352            actual
353                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
354                .transpose()
355        }
356    }
357
358    impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
359        for VerifyStateStore<A, E>
360    {
361        fn nearest<O: Send + 'static>(
362            &self,
363            vec: Vector,
364            options: VectorNearestOptions,
365            on_nearest_item_fn: impl OnNearestItemFn<O>,
366        ) -> impl StorageFuture<'_, Vec<O>> {
367            self.actual.nearest(vec, options, on_nearest_item_fn)
368        }
369    }
370
371    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
372        type Iter = impl StateStoreReadIter;
373        type RevIter = impl StateStoreReadIter;
374
375        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
376        // fail to compile.
377        #[expect(clippy::manual_async_fn)]
378        fn iter(
379            &self,
380            key_range: TableKeyRange,
381
382            read_options: ReadOptions,
383        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
384            async move {
385                let actual = self
386                    .actual
387                    .iter(key_range.clone(), read_options.clone())
388                    .await?;
389                let expected = if let Some(expected) = &self.expected {
390                    Some(expected.iter(key_range, read_options).await?)
391                } else {
392                    None
393                };
394
395                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
396            }
397        }
398
399        #[expect(clippy::manual_async_fn)]
400        fn rev_iter(
401            &self,
402            key_range: TableKeyRange,
403
404            read_options: ReadOptions,
405        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
406            async move {
407                let actual = self
408                    .actual
409                    .rev_iter(key_range.clone(), read_options.clone())
410                    .await?;
411                let expected = if let Some(expected) = &self.expected {
412                    Some(expected.rev_iter(key_range, read_options).await?)
413                } else {
414                    None
415                };
416
417                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
418            }
419        }
420    }
421
422    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
423        type ChangeLogIter = impl StateStoreReadChangeLogIter;
424
425        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
426            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
427            if let Some(expected) = &self.expected {
428                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
429            }
430            Ok(actual)
431        }
432
433        async fn iter_log(
434            &self,
435            epoch_range: (u64, u64),
436            key_range: TableKeyRange,
437            options: ReadLogOptions,
438        ) -> StorageResult<Self::ChangeLogIter> {
439            let actual = self
440                .actual
441                .iter_log(epoch_range, key_range.clone(), options.clone())
442                .await?;
443            let expected = if let Some(expected) = &self.expected {
444                Some(expected.iter_log(epoch_range, key_range, options).await?)
445            } else {
446                None
447            };
448
449            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
450        }
451    }
452
453    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
454        for VerifyStateStore<A, E, T>
455    where
456        for<'a> T::ItemRef<'a>: PartialEq + Debug,
457    {
458        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
459            let actual = self.actual.try_next().await?;
460            if let Some(expected) = self.expected.as_mut() {
461                let expected = expected.try_next().await?;
462                assert_eq!(actual, expected);
463            }
464            Ok(actual)
465        }
466    }
467
468    fn verify_iter<T: IterItem>(
469        actual: impl StateStoreIter<T>,
470        expected: Option<impl StateStoreIter<T>>,
471    ) -> impl StateStoreIter<T>
472    where
473        for<'a> T::ItemRef<'a>: PartialEq + Debug,
474    {
475        VerifyStateStore {
476            actual,
477            expected,
478            _phantom: PhantomData::<T>,
479        }
480    }
481
482    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
483        type FlushedSnapshotReader =
484            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
485
486        type Iter<'a> = impl StateStoreIter + 'a;
487        type RevIter<'a> = impl StateStoreIter + 'a;
488
489        #[expect(clippy::manual_async_fn)]
490        fn iter(
491            &self,
492            key_range: TableKeyRange,
493            read_options: ReadOptions,
494        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
495            async move {
496                let actual = self
497                    .actual
498                    .iter(key_range.clone(), read_options.clone())
499                    .await?;
500                let expected = if let Some(expected) = &self.expected {
501                    Some(expected.iter(key_range, read_options).await?)
502                } else {
503                    None
504                };
505
506                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
507            }
508        }
509
510        #[expect(clippy::manual_async_fn)]
511        fn rev_iter(
512            &self,
513            key_range: TableKeyRange,
514            read_options: ReadOptions,
515        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
516            async move {
517                let actual = self
518                    .actual
519                    .rev_iter(key_range.clone(), read_options.clone())
520                    .await?;
521                let expected = if let Some(expected) = &self.expected {
522                    Some(expected.rev_iter(key_range, read_options).await?)
523                } else {
524                    None
525                };
526
527                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
528            }
529        }
530
531        fn insert(
532            &mut self,
533            key: TableKey<Bytes>,
534            new_val: Bytes,
535            old_val: Option<Bytes>,
536        ) -> StorageResult<()> {
537            if let Some(expected) = &mut self.expected {
538                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
539            }
540            self.actual.insert(key, new_val, old_val)?;
541
542            Ok(())
543        }
544
545        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
546            if let Some(expected) = &mut self.expected {
547                expected.delete(key.clone(), old_val.clone())?;
548            }
549            self.actual.delete(key, old_val)?;
550            Ok(())
551        }
552
553        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
554            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
555            if let Some(expected) = &mut self.expected {
556                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
557            }
558            Ok(ret)
559        }
560
561        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
562            let ret = self.actual.get_table_watermark(vnode);
563            if let Some(expected) = &self.expected {
564                assert_eq!(ret, expected.get_table_watermark(vnode));
565            }
566            ret
567        }
568
569        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
570            VerifyStateStore {
571                actual: self.actual.new_flushed_snapshot_reader(),
572                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
573                _phantom: Default::default(),
574            }
575        }
576    }
577
578    impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
579        for VerifyStateStore<A, E>
580    {
581        async fn flush(&mut self) -> StorageResult<usize> {
582            if let Some(expected) = &mut self.expected {
583                expected.flush().await?;
584            }
585            self.actual.flush().await
586        }
587
588        async fn try_flush(&mut self) -> StorageResult<()> {
589            if let Some(expected) = &mut self.expected {
590                expected.try_flush().await?;
591            }
592            self.actual.try_flush().await
593        }
594
595        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
596            self.actual.init(options.clone()).await?;
597            if let Some(expected) = &mut self.expected {
598                expected.init(options).await?;
599            }
600            Ok(())
601        }
602
603        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
604            if let Some(expected) = &mut self.expected {
605                expected.seal_current_epoch(next_epoch, opts.clone());
606            }
607            self.actual.seal_current_epoch(next_epoch, opts);
608        }
609    }
610
611    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
612        type Local = VerifyStateStore<A::Local, E::Local>;
613        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
614        type VectorWriter = A::VectorWriter;
615
616        fn try_wait_epoch(
617            &self,
618            epoch: HummockReadEpoch,
619            options: TryWaitEpochOptions,
620        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
621            self.actual.try_wait_epoch(epoch, options)
622        }
623
624        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
625            let expected = if let Some(expected) = &self.expected {
626                Some(expected.new_local(option.clone()).await)
627            } else {
628                None
629            };
630            VerifyStateStore {
631                actual: self.actual.new_local(option).await,
632                expected,
633                _phantom: PhantomData::<()>,
634            }
635        }
636
637        async fn new_read_snapshot(
638            &self,
639            epoch: HummockReadEpoch,
640            options: NewReadSnapshotOptions,
641        ) -> StorageResult<Self::ReadSnapshot> {
642            let expected = if let Some(expected) = &self.expected {
643                Some(expected.new_read_snapshot(epoch, options).await?)
644            } else {
645                None
646            };
647            Ok(VerifyStateStore {
648                actual: self.actual.new_read_snapshot(epoch, options).await?,
649                expected,
650                _phantom: PhantomData::<()>,
651            })
652        }
653
654        fn new_vector_writer(
655            &self,
656            options: NewVectorWriterOptions,
657        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
658            self.actual.new_vector_writer(options)
659        }
660    }
661
662    impl<A, E> Deref for VerifyStateStore<A, E> {
663        type Target = A;
664
665        fn deref(&self) -> &Self::Target {
666            &self.actual
667        }
668    }
669}
670
671impl StateStoreImpl {
672    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
673    #[allow(clippy::too_many_arguments)]
674    #[expect(clippy::borrowed_box)]
675    pub async fn new(
676        s: &str,
677        opts: Arc<StorageOpts>,
678        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
679        state_store_metrics: Arc<HummockStateStoreMetrics>,
680        object_store_metrics: Arc<ObjectStoreMetrics>,
681        storage_metrics: Arc<MonitoredStorageMetrics>,
682        compactor_metrics: Arc<CompactorMetrics>,
683        await_tree_config: Option<await_tree::Config>,
684        use_new_object_prefix_strategy: bool,
685    ) -> StorageResult<Self> {
686        const KB: usize = 1 << 10;
687        const MB: usize = 1 << 20;
688
689        let meta_cache = {
690            let mut builder = HybridCacheBuilder::new()
691                .with_name("foyer.meta")
692                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
693                .memory(opts.meta_cache_capacity_mb * MB)
694                .with_shards(opts.meta_cache_shard_num)
695                .with_eviction_config(opts.meta_cache_eviction_config.clone())
696                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
697                    u64::BITS as usize / 8 + value.estimate_size()
698                })
699                .storage(Engine::Large);
700
701            if !opts.meta_file_cache_dir.is_empty() {
702                if let Err(e) = Feature::ElasticDiskCache.check_available() {
703                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
704                } else {
705                    builder = builder
706                        .with_device_options(
707                            DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
708                                .with_capacity(opts.meta_file_cache_capacity_mb * MB)
709                                .with_file_size(opts.meta_file_cache_file_capacity_mb * MB)
710                                .with_throttle(opts.meta_file_cache_throttle.clone()),
711                        )
712                        .with_recover_mode(opts.meta_file_cache_recover_mode)
713                        .with_compression(opts.meta_file_cache_compression)
714                        .with_runtime_options(opts.meta_file_cache_runtime_config.clone())
715                        .with_large_object_disk_cache_options(
716                            LargeEngineOptions::new()
717                                .with_indexer_shards(opts.meta_file_cache_indexer_shards)
718                                .with_flushers(opts.meta_file_cache_flushers)
719                                .with_reclaimers(opts.meta_file_cache_reclaimers)
720                                .with_buffer_pool_size(
721                                    opts.meta_file_cache_flush_buffer_threshold_mb * MB,
722                                ) // 128 MiB
723                                .with_clean_region_threshold(
724                                    opts.meta_file_cache_reclaimers
725                                        + opts.meta_file_cache_reclaimers / 2,
726                                )
727                                .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
728                                .with_blob_index_size(16 * KB)
729                                .with_eviction_pickers(vec![Box::new(FifoPicker::new(
730                                    opts.meta_file_cache_fifo_probation_ratio,
731                                ))]),
732                        );
733                }
734            }
735
736            builder.build().await.map_err(HummockError::foyer_error)?
737        };
738
739        let block_cache = {
740            let mut builder = HybridCacheBuilder::new()
741                .with_name("foyer.data")
742                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
743                .with_event_listener(Arc::new(BlockCacheEventListener::new(
744                    state_store_metrics.clone(),
745                )))
746                .memory(opts.block_cache_capacity_mb * MB)
747                .with_shards(opts.block_cache_shard_num)
748                .with_eviction_config(opts.block_cache_eviction_config.clone())
749                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
750                    // FIXME(MrCroxx): Calculate block weight more accurately.
751                    u64::BITS as usize * 2 / 8 + value.raw().len()
752                })
753                .storage(Engine::Large);
754
755            if !opts.data_file_cache_dir.is_empty() {
756                if let Err(e) = Feature::ElasticDiskCache.check_available() {
757                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
758                } else {
759                    builder = builder
760                        .with_device_options(
761                            DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
762                                .with_capacity(opts.data_file_cache_capacity_mb * MB)
763                                .with_file_size(opts.data_file_cache_file_capacity_mb * MB)
764                                .with_throttle(opts.data_file_cache_throttle.clone()),
765                        )
766                        .with_recover_mode(opts.data_file_cache_recover_mode)
767                        .with_compression(opts.data_file_cache_compression)
768                        .with_runtime_options(opts.data_file_cache_runtime_config.clone())
769                        .with_large_object_disk_cache_options(
770                            LargeEngineOptions::new()
771                                .with_indexer_shards(opts.data_file_cache_indexer_shards)
772                                .with_flushers(opts.data_file_cache_flushers)
773                                .with_reclaimers(opts.data_file_cache_reclaimers)
774                                .with_buffer_pool_size(
775                                    opts.data_file_cache_flush_buffer_threshold_mb * MB,
776                                ) // 128 MiB
777                                .with_clean_region_threshold(
778                                    opts.data_file_cache_reclaimers
779                                        + opts.data_file_cache_reclaimers / 2,
780                                )
781                                .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
782                                .with_blob_index_size(16 * KB)
783                                .with_eviction_pickers(vec![Box::new(FifoPicker::new(
784                                    opts.data_file_cache_fifo_probation_ratio,
785                                ))]),
786                        );
787                }
788            }
789
790            builder.build().await.map_err(HummockError::foyer_error)?
791        };
792
793        let recent_filter = if opts.data_file_cache_dir.is_empty() {
794            None
795        } else {
796            Some(Arc::new(RecentFilter::new(
797                opts.cache_refill_recent_filter_layers,
798                Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
799            )))
800        };
801
802        let store = match s {
803            hummock if hummock.starts_with("hummock+") => {
804                let object_store = build_remote_object_store(
805                    hummock.strip_prefix("hummock+").unwrap(),
806                    object_store_metrics.clone(),
807                    "Hummock",
808                    Arc::new(opts.object_store_config.clone()),
809                )
810                .await;
811
812                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
813                    store: Arc::new(object_store),
814                    path: opts.data_directory.clone(),
815                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
816                    max_prefetch_block_number: opts.max_prefetch_block_number,
817                    recent_filter,
818                    state_store_metrics: state_store_metrics.clone(),
819                    use_new_object_prefix_strategy,
820
821                    meta_cache,
822                    block_cache,
823                }));
824                let notification_client =
825                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
826                let compaction_catalog_manager_ref =
827                    Arc::new(CompactionCatalogManager::new(Box::new(
828                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
829                    )));
830
831                let inner = HummockStorage::new(
832                    opts.clone(),
833                    sstable_store,
834                    hummock_meta_client.clone(),
835                    notification_client,
836                    compaction_catalog_manager_ref,
837                    state_store_metrics.clone(),
838                    compactor_metrics.clone(),
839                    await_tree_config,
840                )
841                .await?;
842
843                StateStoreImpl::hummock(inner, storage_metrics)
844            }
845
846            "in_memory" | "in-memory" => {
847                tracing::warn!(
848                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
849                );
850                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
851            }
852
853            sled if sled.starts_with("sled://") => {
854                tracing::warn!(
855                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
856                );
857                let path = sled.strip_prefix("sled://").unwrap();
858                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
859            }
860
861            other => unimplemented!("{} state store is not supported", other),
862        };
863
864        Ok(store)
865    }
866}
867
868pub trait AsHummock: Send + Sync {
869    fn as_hummock(&self) -> Option<&HummockStorage>;
870
871    fn sync(
872        &self,
873        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
874    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
875        async move {
876            if let Some(hummock) = self.as_hummock() {
877                hummock.sync(sync_table_epochs).await
878            } else {
879                Ok(SyncResult::default())
880            }
881        }
882        .boxed()
883    }
884}
885
886impl AsHummock for HummockStorage {
887    fn as_hummock(&self) -> Option<&HummockStorage> {
888        Some(self)
889    }
890}
891
892impl AsHummock for MemoryStateStore {
893    fn as_hummock(&self) -> Option<&HummockStorage> {
894        None
895    }
896}
897
898impl AsHummock for SledStateStore {
899    fn as_hummock(&self) -> Option<&HummockStorage> {
900        None
901    }
902}
903
904#[cfg(debug_assertions)]
905mod dyn_state_store {
906    use std::future::Future;
907    use std::ops::DerefMut;
908    use std::sync::Arc;
909
910    use bytes::Bytes;
911    use risingwave_common::bitmap::Bitmap;
912    use risingwave_common::hash::VirtualNode;
913    use risingwave_hummock_sdk::HummockReadEpoch;
914    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
915
916    use crate::error::StorageResult;
917    use crate::hummock::HummockStorage;
918    use crate::store::*;
919    use crate::store_impl::AsHummock;
920    use crate::vector::VectorDistance;
921
922    #[async_trait::async_trait]
923    pub trait DynStateStoreIter<T: IterItem>: Send {
924        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
925    }
926
927    #[async_trait::async_trait]
928    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
929        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
930            self.try_next().await
931        }
932    }
933
934    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
935    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
936        fn try_next(
937            &mut self,
938        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
939            self.deref_mut().try_next()
940        }
941    }
942
943    // For StateStoreRead
944
945    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
946    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
947
948    #[async_trait::async_trait]
949    pub trait DynStateStoreGet: StaticSendSync {
950        async fn get_keyed_row(
951            &self,
952            key: TableKey<Bytes>,
953            read_options: ReadOptions,
954        ) -> StorageResult<Option<StateStoreKeyedRow>>;
955    }
956
957    #[async_trait::async_trait]
958    pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
959        async fn iter(
960            &self,
961            key_range: TableKeyRange,
962
963            read_options: ReadOptions,
964        ) -> StorageResult<BoxStateStoreReadIter>;
965
966        async fn rev_iter(
967            &self,
968            key_range: TableKeyRange,
969
970            read_options: ReadOptions,
971        ) -> StorageResult<BoxStateStoreReadIter>;
972    }
973
974    #[async_trait::async_trait]
975    pub trait DynStateStoreReadLog: StaticSendSync {
976        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
977        async fn iter_log(
978            &self,
979            epoch_range: (u64, u64),
980            key_range: TableKeyRange,
981            options: ReadLogOptions,
982        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
983    }
984
985    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
986
987    #[async_trait::async_trait]
988    impl<S: StateStoreGet> DynStateStoreGet for S {
989        async fn get_keyed_row(
990            &self,
991            key: TableKey<Bytes>,
992            read_options: ReadOptions,
993        ) -> StorageResult<Option<StateStoreKeyedRow>> {
994            self.on_key_value(key, read_options, move |key, value| {
995                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
996            })
997            .await
998        }
999    }
1000
1001    #[async_trait::async_trait]
1002    impl<S: StateStoreRead> DynStateStoreRead for S {
1003        async fn iter(
1004            &self,
1005            key_range: TableKeyRange,
1006
1007            read_options: ReadOptions,
1008        ) -> StorageResult<BoxStateStoreReadIter> {
1009            Ok(Box::new(self.iter(key_range, read_options).await?))
1010        }
1011
1012        async fn rev_iter(
1013            &self,
1014            key_range: TableKeyRange,
1015
1016            read_options: ReadOptions,
1017        ) -> StorageResult<BoxStateStoreReadIter> {
1018            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1019        }
1020    }
1021
1022    #[async_trait::async_trait]
1023    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1024        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1025            self.next_epoch(epoch, options).await
1026        }
1027
1028        async fn iter_log(
1029            &self,
1030            epoch_range: (u64, u64),
1031            key_range: TableKeyRange,
1032            options: ReadLogOptions,
1033        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1034            Ok(Box::new(
1035                self.iter_log(epoch_range, key_range, options).await?,
1036            ))
1037        }
1038    }
1039
1040    // For LocalStateStore
1041    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1042    #[async_trait::async_trait]
1043    pub trait DynLocalStateStore:
1044        DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1045    {
1046        async fn iter(
1047            &self,
1048            key_range: TableKeyRange,
1049            read_options: ReadOptions,
1050        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1051
1052        async fn rev_iter(
1053            &self,
1054            key_range: TableKeyRange,
1055            read_options: ReadOptions,
1056        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1057
1058        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1059
1060        fn insert(
1061            &mut self,
1062            key: TableKey<Bytes>,
1063            new_val: Bytes,
1064            old_val: Option<Bytes>,
1065        ) -> StorageResult<()>;
1066
1067        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1068
1069        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1070
1071        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1072    }
1073
1074    #[async_trait::async_trait]
1075    pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1076        async fn flush(&mut self) -> StorageResult<usize>;
1077
1078        async fn try_flush(&mut self) -> StorageResult<()>;
1079
1080        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1081
1082        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1083    }
1084
1085    #[async_trait::async_trait]
1086    impl<S: LocalStateStore> DynLocalStateStore for S {
1087        async fn iter(
1088            &self,
1089            key_range: TableKeyRange,
1090            read_options: ReadOptions,
1091        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1092            Ok(Box::new(self.iter(key_range, read_options).await?))
1093        }
1094
1095        async fn rev_iter(
1096            &self,
1097            key_range: TableKeyRange,
1098            read_options: ReadOptions,
1099        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1100            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1101        }
1102
1103        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1104            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1105        }
1106
1107        fn insert(
1108            &mut self,
1109            key: TableKey<Bytes>,
1110            new_val: Bytes,
1111            old_val: Option<Bytes>,
1112        ) -> StorageResult<()> {
1113            self.insert(key, new_val, old_val)
1114        }
1115
1116        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1117            self.delete(key, old_val)
1118        }
1119
1120        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1121            self.update_vnode_bitmap(vnodes).await
1122        }
1123
1124        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1125            self.get_table_watermark(vnode)
1126        }
1127    }
1128
1129    #[async_trait::async_trait]
1130    impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1131        async fn flush(&mut self) -> StorageResult<usize> {
1132            self.flush().await
1133        }
1134
1135        async fn try_flush(&mut self) -> StorageResult<()> {
1136            self.try_flush().await
1137        }
1138
1139        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1140            self.init(options).await
1141        }
1142
1143        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1144            self.seal_current_epoch(next_epoch, opts)
1145        }
1146    }
1147
1148    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1149
1150    impl LocalStateStore for BoxDynLocalStateStore {
1151        type FlushedSnapshotReader = StateStoreReadDynRef;
1152        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1153        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1154
1155        fn iter(
1156            &self,
1157            key_range: TableKeyRange,
1158            read_options: ReadOptions,
1159        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1160            (*self.0).iter(key_range, read_options)
1161        }
1162
1163        fn rev_iter(
1164            &self,
1165            key_range: TableKeyRange,
1166            read_options: ReadOptions,
1167        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1168            (*self.0).rev_iter(key_range, read_options)
1169        }
1170
1171        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1172            (*self.0).new_flushed_snapshot_reader()
1173        }
1174
1175        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1176            (*self.0).get_table_watermark(vnode)
1177        }
1178
1179        fn insert(
1180            &mut self,
1181            key: TableKey<Bytes>,
1182            new_val: Bytes,
1183            old_val: Option<Bytes>,
1184        ) -> StorageResult<()> {
1185            (*self.0).insert(key, new_val, old_val)
1186        }
1187
1188        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1189            (*self.0).delete(key, old_val)
1190        }
1191
1192        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1193            (*self.0).update_vnode_bitmap(vnodes).await
1194        }
1195    }
1196
1197    impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1198    where
1199        StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1200    {
1201        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1202            self.as_mut().flush()
1203        }
1204
1205        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1206            self.as_mut().try_flush()
1207        }
1208
1209        fn init(
1210            &mut self,
1211            options: InitOptions,
1212        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1213            self.as_mut().init(options)
1214        }
1215
1216        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1217            self.as_mut().seal_current_epoch(next_epoch, opts)
1218        }
1219    }
1220
1221    #[async_trait::async_trait]
1222    pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1223        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1224    }
1225
1226    #[async_trait::async_trait]
1227    impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1228        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1229            self.insert(vec, info)
1230        }
1231    }
1232
1233    pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1234
1235    impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1236        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1237            self.0.insert(vec, info)
1238        }
1239    }
1240
1241    // For global StateStore
1242
1243    #[async_trait::async_trait]
1244    pub trait DynStateStoreReadVector: StaticSendSync {
1245        async fn nearest(
1246            &self,
1247            vec: Vector,
1248            options: VectorNearestOptions,
1249        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1250    }
1251
1252    #[async_trait::async_trait]
1253    impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1254        async fn nearest(
1255            &self,
1256            vec: Vector,
1257            options: VectorNearestOptions,
1258        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1259            self.nearest(vec, options, |vec, distance, info| {
1260                (
1261                    Vector::clone_from_ref(vec),
1262                    distance,
1263                    Bytes::copy_from_slice(info),
1264                )
1265            })
1266            .await
1267        }
1268    }
1269
1270    impl<P> StateStoreReadVector for StateStorePointer<P>
1271    where
1272        StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1273    {
1274        async fn nearest<O: Send + 'static>(
1275            &self,
1276            vec: Vector,
1277            options: VectorNearestOptions,
1278            on_nearest_item_fn: impl OnNearestItemFn<O>,
1279        ) -> StorageResult<Vec<O>> {
1280            let output = self.as_ref().nearest(vec, options).await?;
1281            Ok(output
1282                .into_iter()
1283                .map(|(vec, distance, info)| {
1284                    on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1285                })
1286                .collect())
1287        }
1288    }
1289
1290    pub trait DynStateStoreReadSnapshot:
1291        DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1292    {
1293    }
1294
1295    impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1296        for S
1297    {
1298    }
1299
1300    pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1301    #[async_trait::async_trait]
1302    pub trait DynStateStoreExt: StaticSendSync {
1303        async fn try_wait_epoch(
1304            &self,
1305            epoch: HummockReadEpoch,
1306            options: TryWaitEpochOptions,
1307        ) -> StorageResult<()>;
1308
1309        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1310        async fn new_read_snapshot(
1311            &self,
1312            epoch: HummockReadEpoch,
1313            options: NewReadSnapshotOptions,
1314        ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1315        async fn new_vector_writer(
1316            &self,
1317            options: NewVectorWriterOptions,
1318        ) -> BoxDynStateStoreWriteVector;
1319    }
1320
1321    #[async_trait::async_trait]
1322    impl<S: StateStore> DynStateStoreExt for S {
1323        async fn try_wait_epoch(
1324            &self,
1325            epoch: HummockReadEpoch,
1326            options: TryWaitEpochOptions,
1327        ) -> StorageResult<()> {
1328            self.try_wait_epoch(epoch, options).await
1329        }
1330
1331        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1332            StateStorePointer(Box::new(self.new_local(option).await))
1333        }
1334
1335        async fn new_read_snapshot(
1336            &self,
1337            epoch: HummockReadEpoch,
1338            options: NewReadSnapshotOptions,
1339        ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1340            Ok(StateStorePointer(Arc::new(
1341                self.new_read_snapshot(epoch, options).await?,
1342            )))
1343        }
1344
1345        async fn new_vector_writer(
1346            &self,
1347            options: NewVectorWriterOptions,
1348        ) -> BoxDynStateStoreWriteVector {
1349            StateStorePointer(Box::new(self.new_vector_writer(options).await))
1350        }
1351    }
1352
1353    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1354
1355    macro_rules! state_store_pointer_dyn_as_ref {
1356        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1357            impl AsRef<dyn $target_dyn_trait>
1358                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1359            {
1360                fn as_ref(&self) -> &dyn $target_dyn_trait {
1361                    (&*self.0) as _
1362                }
1363            }
1364        };
1365    }
1366
1367    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1368    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1369    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1370    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1371    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1372    state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1373
1374    macro_rules! state_store_pointer_dyn_as_mut {
1375        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1376            impl AsMut<dyn $target_dyn_trait>
1377                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1378            {
1379                fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1380                    (&mut *self.0) as _
1381                }
1382            }
1383        };
1384    }
1385
1386    state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1387    state_store_pointer_dyn_as_mut!(
1388        Box<dyn DynStateStoreWriteVector>,
1389        DynStateStoreWriteEpochControl
1390    );
1391
1392    #[derive(Clone)]
1393    pub struct StateStorePointer<P>(pub(crate) P);
1394
1395    impl<P> StateStoreGet for StateStorePointer<P>
1396    where
1397        StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1398    {
1399        async fn on_key_value<O: Send + 'static>(
1400            &self,
1401            key: TableKey<Bytes>,
1402            read_options: ReadOptions,
1403            on_key_value_fn: impl KeyValueFn<O>,
1404        ) -> StorageResult<Option<O>> {
1405            let option = self.as_ref().get_keyed_row(key, read_options).await?;
1406            option
1407                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1408                .transpose()
1409        }
1410    }
1411
1412    impl<P> StateStoreRead for StateStorePointer<P>
1413    where
1414        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1415    {
1416        type Iter = BoxStateStoreReadIter;
1417        type RevIter = BoxStateStoreReadIter;
1418
1419        fn iter(
1420            &self,
1421            key_range: TableKeyRange,
1422
1423            read_options: ReadOptions,
1424        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1425            self.as_ref().iter(key_range, read_options)
1426        }
1427
1428        fn rev_iter(
1429            &self,
1430            key_range: TableKeyRange,
1431
1432            read_options: ReadOptions,
1433        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1434            self.as_ref().rev_iter(key_range, read_options)
1435        }
1436    }
1437
1438    impl StateStoreReadLog for StateStoreDynRef {
1439        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1440
1441        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1442            (*self.0).next_epoch(epoch, options).await
1443        }
1444
1445        fn iter_log(
1446            &self,
1447            epoch_range: (u64, u64),
1448            key_range: TableKeyRange,
1449            options: ReadLogOptions,
1450        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1451            (*self.0).iter_log(epoch_range, key_range, options)
1452        }
1453    }
1454
1455    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1456
1457    impl AsHummock for StateStoreDynRef {
1458        fn as_hummock(&self) -> Option<&HummockStorage> {
1459            (*self.0).as_hummock()
1460        }
1461    }
1462
1463    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1464
1465    impl StateStore for StateStoreDynRef {
1466        type Local = BoxDynLocalStateStore;
1467        type ReadSnapshot = StateStoreReadSnapshotDynRef;
1468        type VectorWriter = BoxDynStateStoreWriteVector;
1469
1470        fn try_wait_epoch(
1471            &self,
1472            epoch: HummockReadEpoch,
1473            options: TryWaitEpochOptions,
1474        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1475            (*self.0).try_wait_epoch(epoch, options)
1476        }
1477
1478        fn new_local(
1479            &self,
1480            option: NewLocalOptions,
1481        ) -> impl Future<Output = Self::Local> + Send + '_ {
1482            (*self.0).new_local(option)
1483        }
1484
1485        async fn new_read_snapshot(
1486            &self,
1487            epoch: HummockReadEpoch,
1488            options: NewReadSnapshotOptions,
1489        ) -> StorageResult<Self::ReadSnapshot> {
1490            (*self.0).new_read_snapshot(epoch, options).await
1491        }
1492
1493        fn new_vector_writer(
1494            &self,
1495            options: NewVectorWriterOptions,
1496        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1497            (*self.0).new_vector_writer(options)
1498        }
1499    }
1500}