risingwave_storage/
store_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22    BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23    HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45    Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46    SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52    ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57    Box::new(PrometheusMetricsRegistry::new(
58        GLOBAL_METRICS_REGISTRY.clone(),
59    ))
60});
61
62mod opaque_type {
63    use super::*;
64
65    pub type HummockStorageType = impl StateStore + AsHummock;
66    pub type MemoryStateStoreType = impl StateStore + AsHummock;
67    pub type SledStateStoreType = impl StateStore + AsHummock;
68
69    #[define_opaque(MemoryStateStoreType)]
70    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71        may_dynamic_dispatch(state_store)
72    }
73
74    #[define_opaque(HummockStorageType)]
75    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76        may_dynamic_dispatch(may_verify(state_store))
77    }
78
79    #[define_opaque(SledStateStoreType)]
80    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81        may_dynamic_dispatch(state_store)
82    }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94    state_store: S,
95    storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97    let inner = {
98        #[cfg(feature = "hm-trace")]
99        {
100            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101        }
102        #[cfg(not(feature = "hm-trace"))]
103        {
104            state_store
105        }
106    };
107    inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111    let inner = state_store.inner();
112    {
113        #[cfg(feature = "hm-trace")]
114        {
115            inner.inner()
116        }
117        #[cfg(not(feature = "hm-trace"))]
118        {
119            inner
120        }
121    }
122}
123
124/// The type erased [`StateStore`].
125#[derive(Clone, EnumAsInner)]
126#[allow(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
129    /// `hummock` will be automatically recognized as Hummock state store.
130    ///
131    /// Example URLs:
132    ///
133    /// * `hummock+s3://bucket`
134    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
135    /// * `hummock+memory` (should only be used in 1 compute node mode)
136    HummockStateStore(Monitored<HummockStorageType>),
137    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
138    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
139    /// store misses some critical implementation to ensure the correctness of persisting streaming
140    /// state. (e.g., no `read_epoch` support, no async checkpoint)
141    MemoryStateStore(Monitored<MemoryStateStoreType>),
142    SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146    #[cfg(not(debug_assertions))]
147    {
148        state_store
149    }
150    #[cfg(debug_assertions)]
151    {
152        use crate::store_impl::dyn_state_store::StateStorePointer;
153        StateStorePointer(Arc::new(state_store) as _)
154    }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158    #[cfg(not(debug_assertions))]
159    {
160        state_store
161    }
162    #[cfg(debug_assertions)]
163    {
164        use std::marker::PhantomData;
165
166        use risingwave_common::util::env_var::env_var_is_true;
167        use tracing::info;
168
169        use crate::store_impl::verify::VerifyStateStore;
170
171        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172            info!("enable verify state store");
173            Some(SledStateStore::new_temp())
174        } else {
175            info!("verify state store is not enabled");
176            None
177        };
178        VerifyStateStore {
179            actual: state_store,
180            expected,
181            _phantom: PhantomData::<()>,
182        }
183    }
184}
185
186impl StateStoreImpl {
187    fn in_memory(
188        state_store: MemoryStateStore,
189        storage_metrics: Arc<MonitoredStorageMetrics>,
190    ) -> Self {
191        // The specific type of MemoryStateStoreType in deducted here.
192        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193    }
194
195    pub fn hummock(
196        state_store: HummockStorage,
197        storage_metrics: Arc<MonitoredStorageMetrics>,
198    ) -> Self {
199        // The specific type of HummockStateStoreType in deducted here.
200        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201    }
202
203    pub fn sled(
204        state_store: SledStateStore,
205        storage_metrics: Arc<MonitoredStorageMetrics>,
206    ) -> Self {
207        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208    }
209
210    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212    }
213
214    pub fn for_test() -> Self {
215        Self::in_memory(
216            MemoryStateStore::new(),
217            Arc::new(MonitoredStorageMetrics::unused()),
218        )
219    }
220
221    pub fn as_hummock(&self) -> Option<&HummockStorage> {
222        match self {
223            StateStoreImpl::HummockStateStore(hummock) => {
224                Some(inner(hummock).as_hummock().expect("should be hummock"))
225            }
226            _ => None,
227        }
228    }
229}
230
231impl Debug for StateStoreImpl {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        match self {
234            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237        }
238    }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243    ($impl:expr, $store:ident, $body:tt) => {{
244        use $crate::store_impl::StateStoreImpl;
245
246        match $impl {
247            StateStoreImpl::MemoryStateStore($store) => {
248                // WARNING: don't change this. Enabling memory backend will cause monomorphization
249                // explosion and thus slow compile time in release mode.
250                #[cfg(debug_assertions)]
251                {
252                    $body
253                }
254                #[cfg(not(debug_assertions))]
255                {
256                    let _store = $store;
257                    unimplemented!("memory state store should never be used in release mode");
258                }
259            }
260
261            StateStoreImpl::SledStateStore($store) => {
262                // WARNING: don't change this. Enabling memory backend will cause monomorphization
263                // explosion and thus slow compile time in release mode.
264                #[cfg(debug_assertions)]
265                {
266                    $body
267                }
268                #[cfg(not(debug_assertions))]
269                {
270                    let _store = $store;
271                    unimplemented!("sled state store should never be used in release mode");
272                }
273            }
274
275            StateStoreImpl::HummockStateStore($store) => $body,
276        }
277    }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282    use std::fmt::Debug;
283    use std::future::Future;
284    use std::marker::PhantomData;
285    use std::ops::Deref;
286    use std::sync::Arc;
287
288    use bytes::Bytes;
289    use risingwave_common::array::VectorRef;
290    use risingwave_common::bitmap::Bitmap;
291    use risingwave_common::hash::VirtualNode;
292    use risingwave_hummock_sdk::HummockReadEpoch;
293    use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
294    use tracing::log::warn;
295
296    use crate::error::StorageResult;
297    use crate::hummock::HummockStorage;
298    use crate::store::*;
299    use crate::store_impl::AsHummock;
300
301    #[expect(dead_code)]
302    fn assert_result_eq<Item: PartialEq + Debug, E>(
303        first: &std::result::Result<Item, E>,
304        second: &std::result::Result<Item, E>,
305    ) {
306        match (first, second) {
307            (Ok(first), Ok(second)) => {
308                if first != second {
309                    warn!("result different: {:?} {:?}", first, second);
310                }
311                assert_eq!(first, second);
312            }
313            (Err(_), Err(_)) => {}
314            _ => {
315                warn!("one success and one failed");
316                panic!("result not equal");
317            }
318        }
319    }
320
321    #[derive(Clone)]
322    pub struct VerifyStateStore<A, E, T = ()> {
323        pub actual: A,
324        pub expected: Option<E>,
325        pub _phantom: PhantomData<T>,
326    }
327
328    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
329        fn as_hummock(&self) -> Option<&HummockStorage> {
330            self.actual.as_hummock()
331        }
332    }
333
334    impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
335        async fn on_key_value<'a, O: Send + 'a>(
336            &'a self,
337            key: TableKey<Bytes>,
338            read_options: ReadOptions,
339            on_key_value_fn: impl KeyValueFn<'a, O>,
340        ) -> StorageResult<Option<O>> {
341            let actual: Option<(FullKey<Bytes>, Bytes)> = self
342                .actual
343                .on_key_value(key.clone(), read_options.clone(), |key, value| {
344                    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345                })
346                .await?;
347            if let Some(expected) = &self.expected {
348                let expected: Option<(FullKey<Bytes>, Bytes)> = expected
349                    .on_key_value(key, read_options, |key, value| {
350                        Ok((key.copy_into(), Bytes::copy_from_slice(value)))
351                    })
352                    .await?;
353                assert_eq!(
354                    actual
355                        .as_ref()
356                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
357                    expected
358                        .as_ref()
359                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
360                );
361            }
362
363            actual
364                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
365                .transpose()
366        }
367    }
368
369    impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
370        for VerifyStateStore<A, E>
371    {
372        fn nearest<'a, O: Send + 'a>(
373            &'a self,
374            vec: VectorRef<'a>,
375            options: VectorNearestOptions,
376            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
377        ) -> impl StorageFuture<'a, Vec<O>> {
378            self.actual.nearest(vec, options, on_nearest_item_fn)
379        }
380    }
381
382    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
383        type Iter = impl StateStoreReadIter;
384        type RevIter = impl StateStoreReadIter;
385
386        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
387        // fail to compile.
388        #[expect(clippy::manual_async_fn)]
389        fn iter(
390            &self,
391            key_range: TableKeyRange,
392
393            read_options: ReadOptions,
394        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
395            async move {
396                let actual = self
397                    .actual
398                    .iter(key_range.clone(), read_options.clone())
399                    .await?;
400                let expected = if let Some(expected) = &self.expected {
401                    Some(expected.iter(key_range, read_options).await?)
402                } else {
403                    None
404                };
405
406                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
407            }
408        }
409
410        #[expect(clippy::manual_async_fn)]
411        fn rev_iter(
412            &self,
413            key_range: TableKeyRange,
414
415            read_options: ReadOptions,
416        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
417            async move {
418                let actual = self
419                    .actual
420                    .rev_iter(key_range.clone(), read_options.clone())
421                    .await?;
422                let expected = if let Some(expected) = &self.expected {
423                    Some(expected.rev_iter(key_range, read_options).await?)
424                } else {
425                    None
426                };
427
428                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
429            }
430        }
431    }
432
433    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
434        type ChangeLogIter = impl StateStoreReadChangeLogIter;
435
436        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
437            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
438            if let Some(expected) = &self.expected {
439                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
440            }
441            Ok(actual)
442        }
443
444        async fn iter_log(
445            &self,
446            epoch_range: (u64, u64),
447            key_range: TableKeyRange,
448            options: ReadLogOptions,
449        ) -> StorageResult<Self::ChangeLogIter> {
450            let actual = self
451                .actual
452                .iter_log(epoch_range, key_range.clone(), options.clone())
453                .await?;
454            let expected = if let Some(expected) = &self.expected {
455                Some(expected.iter_log(epoch_range, key_range, options).await?)
456            } else {
457                None
458            };
459
460            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
461        }
462    }
463
464    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
465        for VerifyStateStore<A, E, T>
466    where
467        for<'a> T::ItemRef<'a>: PartialEq + Debug,
468    {
469        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
470            let actual = self.actual.try_next().await?;
471            if let Some(expected) = self.expected.as_mut() {
472                let expected = expected.try_next().await?;
473                assert_eq!(actual, expected);
474            }
475            Ok(actual)
476        }
477    }
478
479    fn verify_iter<T: IterItem>(
480        actual: impl StateStoreIter<T>,
481        expected: Option<impl StateStoreIter<T>>,
482    ) -> impl StateStoreIter<T>
483    where
484        for<'a> T::ItemRef<'a>: PartialEq + Debug,
485    {
486        VerifyStateStore {
487            actual,
488            expected,
489            _phantom: PhantomData::<T>,
490        }
491    }
492
493    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
494        type FlushedSnapshotReader =
495            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
496
497        type Iter<'a> = impl StateStoreIter + 'a;
498        type RevIter<'a> = impl StateStoreIter + 'a;
499
500        #[expect(clippy::manual_async_fn)]
501        fn iter(
502            &self,
503            key_range: TableKeyRange,
504            read_options: ReadOptions,
505        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
506            async move {
507                let actual = self
508                    .actual
509                    .iter(key_range.clone(), read_options.clone())
510                    .await?;
511                let expected = if let Some(expected) = &self.expected {
512                    Some(expected.iter(key_range, read_options).await?)
513                } else {
514                    None
515                };
516
517                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
518            }
519        }
520
521        #[expect(clippy::manual_async_fn)]
522        fn rev_iter(
523            &self,
524            key_range: TableKeyRange,
525            read_options: ReadOptions,
526        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
527            async move {
528                let actual = self
529                    .actual
530                    .rev_iter(key_range.clone(), read_options.clone())
531                    .await?;
532                let expected = if let Some(expected) = &self.expected {
533                    Some(expected.rev_iter(key_range, read_options).await?)
534                } else {
535                    None
536                };
537
538                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
539            }
540        }
541
542        fn insert(
543            &mut self,
544            key: TableKey<Bytes>,
545            new_val: Bytes,
546            old_val: Option<Bytes>,
547        ) -> StorageResult<()> {
548            if let Some(expected) = &mut self.expected {
549                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
550            }
551            self.actual.insert(key, new_val, old_val)?;
552
553            Ok(())
554        }
555
556        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
557            if let Some(expected) = &mut self.expected {
558                expected.delete(key.clone(), old_val.clone())?;
559            }
560            self.actual.delete(key, old_val)?;
561            Ok(())
562        }
563
564        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
565            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
566            if let Some(expected) = &mut self.expected {
567                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
568            }
569            Ok(ret)
570        }
571
572        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
573            let ret = self.actual.get_table_watermark(vnode);
574            if let Some(expected) = &self.expected {
575                assert_eq!(ret, expected.get_table_watermark(vnode));
576            }
577            ret
578        }
579
580        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
581            VerifyStateStore {
582                actual: self.actual.new_flushed_snapshot_reader(),
583                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
584                _phantom: Default::default(),
585            }
586        }
587    }
588
589    impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
590        for VerifyStateStore<A, E>
591    {
592        async fn flush(&mut self) -> StorageResult<usize> {
593            if let Some(expected) = &mut self.expected {
594                expected.flush().await?;
595            }
596            self.actual.flush().await
597        }
598
599        async fn try_flush(&mut self) -> StorageResult<()> {
600            if let Some(expected) = &mut self.expected {
601                expected.try_flush().await?;
602            }
603            self.actual.try_flush().await
604        }
605
606        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
607            self.actual.init(options.clone()).await?;
608            if let Some(expected) = &mut self.expected {
609                expected.init(options).await?;
610            }
611            Ok(())
612        }
613
614        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
615            if let Some(expected) = &mut self.expected {
616                expected.seal_current_epoch(next_epoch, opts.clone());
617            }
618            self.actual.seal_current_epoch(next_epoch, opts);
619        }
620    }
621
622    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
623        type Local = VerifyStateStore<A::Local, E::Local>;
624        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
625        type VectorWriter = A::VectorWriter;
626
627        fn try_wait_epoch(
628            &self,
629            epoch: HummockReadEpoch,
630            options: TryWaitEpochOptions,
631        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
632            self.actual.try_wait_epoch(epoch, options)
633        }
634
635        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
636            let expected = if let Some(expected) = &self.expected {
637                Some(expected.new_local(option.clone()).await)
638            } else {
639                None
640            };
641            VerifyStateStore {
642                actual: self.actual.new_local(option).await,
643                expected,
644                _phantom: PhantomData::<()>,
645            }
646        }
647
648        async fn new_read_snapshot(
649            &self,
650            epoch: HummockReadEpoch,
651            options: NewReadSnapshotOptions,
652        ) -> StorageResult<Self::ReadSnapshot> {
653            let expected = if let Some(expected) = &self.expected {
654                Some(expected.new_read_snapshot(epoch, options).await?)
655            } else {
656                None
657            };
658            Ok(VerifyStateStore {
659                actual: self.actual.new_read_snapshot(epoch, options).await?,
660                expected,
661                _phantom: PhantomData::<()>,
662            })
663        }
664
665        fn new_vector_writer(
666            &self,
667            options: NewVectorWriterOptions,
668        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
669            self.actual.new_vector_writer(options)
670        }
671    }
672
673    impl<A, E> Deref for VerifyStateStore<A, E> {
674        type Target = A;
675
676        fn deref(&self) -> &Self::Target {
677            &self.actual
678        }
679    }
680}
681
682impl StateStoreImpl {
683    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
684    #[allow(clippy::too_many_arguments)]
685    #[expect(clippy::borrowed_box)]
686    pub async fn new(
687        s: &str,
688        opts: Arc<StorageOpts>,
689        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
690        state_store_metrics: Arc<HummockStateStoreMetrics>,
691        object_store_metrics: Arc<ObjectStoreMetrics>,
692        storage_metrics: Arc<MonitoredStorageMetrics>,
693        compactor_metrics: Arc<CompactorMetrics>,
694        await_tree_config: Option<await_tree::Config>,
695        use_new_object_prefix_strategy: bool,
696    ) -> StorageResult<Self> {
697        const KB: usize = 1 << 10;
698        const MB: usize = 1 << 20;
699
700        let meta_cache = {
701            let mut builder = HybridCacheBuilder::new()
702                .with_name("foyer.meta")
703                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
704                .memory(opts.meta_cache_capacity_mb * MB)
705                .with_shards(opts.meta_cache_shard_num)
706                .with_eviction_config(opts.meta_cache_eviction_config.clone())
707                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
708                    u64::BITS as usize / 8 + value.estimate_size()
709                })
710                .storage();
711
712            if !opts.meta_file_cache_dir.is_empty() {
713                if let Err(e) = Feature::ElasticDiskCache.check_available() {
714                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
715                } else {
716                    let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
717                        .with_capacity(opts.meta_file_cache_capacity_mb * MB)
718                        .with_throttle(opts.meta_file_cache_throttle.clone())
719                        .build()
720                        .map_err(HummockError::foyer_error)?;
721                    let engine_builder = BlockEngineBuilder::new(device)
722                        .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
723                        .with_indexer_shards(opts.meta_file_cache_indexer_shards)
724                        .with_flushers(opts.meta_file_cache_flushers)
725                        .with_reclaimers(opts.meta_file_cache_reclaimers)
726                        .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
727                        .with_clean_block_threshold(
728                            opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
729                        )
730                        .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
731                        .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
732                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
733                            opts.meta_file_cache_fifo_probation_ratio,
734                        ))]);
735                    builder = builder
736                        .with_engine_config(engine_builder)
737                        .with_recover_mode(opts.meta_file_cache_recover_mode)
738                        .with_compression(opts.meta_file_cache_compression)
739                        .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
740                }
741            }
742
743            builder.build().await.map_err(HummockError::foyer_error)?
744        };
745
746        let block_cache = {
747            let mut builder = HybridCacheBuilder::new()
748                .with_name("foyer.data")
749                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
750                .with_event_listener(Arc::new(BlockCacheEventListener::new(
751                    state_store_metrics.clone(),
752                )))
753                .memory(opts.block_cache_capacity_mb * MB)
754                .with_shards(opts.block_cache_shard_num)
755                .with_eviction_config(opts.block_cache_eviction_config.clone())
756                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
757                    // FIXME(MrCroxx): Calculate block weight more accurately.
758                    u64::BITS as usize * 2 / 8 + value.raw().len()
759                })
760                .storage();
761
762            if !opts.data_file_cache_dir.is_empty() {
763                if let Err(e) = Feature::ElasticDiskCache.check_available() {
764                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
765                } else {
766                    let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
767                        .with_capacity(opts.data_file_cache_capacity_mb * MB)
768                        .with_throttle(opts.data_file_cache_throttle.clone())
769                        .build()
770                        .map_err(HummockError::foyer_error)?;
771                    let engine_builder = BlockEngineBuilder::new(device)
772                        .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
773                        .with_indexer_shards(opts.data_file_cache_indexer_shards)
774                        .with_flushers(opts.data_file_cache_flushers)
775                        .with_reclaimers(opts.data_file_cache_reclaimers)
776                        .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
777                        .with_clean_block_threshold(
778                            opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
779                        )
780                        .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
781                        .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
782                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
783                            opts.data_file_cache_fifo_probation_ratio,
784                        ))]);
785                    builder = builder
786                        .with_engine_config(engine_builder)
787                        .with_recover_mode(opts.data_file_cache_recover_mode)
788                        .with_compression(opts.data_file_cache_compression)
789                        .with_runtime_options(opts.data_file_cache_runtime_config.clone());
790                }
791            }
792
793            builder.build().await.map_err(HummockError::foyer_error)?
794        };
795
796        let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
797            .with_shards(opts.vector_meta_cache_shard_num)
798            .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
799            .build();
800
801        let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
802            .with_shards(opts.vector_block_cache_shard_num)
803            .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
804            .build();
805
806        let recent_filter = if opts.data_file_cache_dir.is_empty() {
807            Arc::new(NoneRecentFilter::default().into())
808        } else if opts.cache_refill_recent_filter_shards == 1 {
809            Arc::new(
810                SimpleRecentFilter::new(
811                    opts.cache_refill_recent_filter_layers,
812                    Duration::from_millis(
813                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
814                    ),
815                )
816                .into(),
817            )
818        } else if opts.cache_refill_skip_recent_filter {
819            Arc::new(AllRecentFilter::default().into())
820        } else {
821            Arc::new(
822                ShardedRecentFilter::new(
823                    opts.cache_refill_recent_filter_layers,
824                    Duration::from_millis(
825                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
826                    ),
827                    opts.cache_refill_recent_filter_shards,
828                )
829                .into(),
830            )
831        };
832
833        let store = match s {
834            hummock if hummock.starts_with("hummock+") => {
835                let object_store = build_remote_object_store(
836                    hummock.strip_prefix("hummock+").unwrap(),
837                    object_store_metrics.clone(),
838                    "Hummock",
839                    Arc::new(opts.object_store_config.clone()),
840                )
841                .await;
842
843                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
844                    store: Arc::new(object_store),
845                    path: opts.data_directory.clone(),
846                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
847                    max_prefetch_block_number: opts.max_prefetch_block_number,
848                    recent_filter,
849                    state_store_metrics: state_store_metrics.clone(),
850                    use_new_object_prefix_strategy,
851                    skip_bloom_filter_in_serde: opts.sst_skip_bloom_filter_in_serde,
852
853                    meta_cache,
854                    block_cache,
855                    vector_meta_cache,
856                    vector_block_cache,
857                }));
858                let notification_client =
859                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
860                let compaction_catalog_manager_ref =
861                    Arc::new(CompactionCatalogManager::new(Box::new(
862                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
863                    )));
864
865                let inner = HummockStorage::new(
866                    opts.clone(),
867                    sstable_store,
868                    hummock_meta_client.clone(),
869                    notification_client,
870                    compaction_catalog_manager_ref,
871                    state_store_metrics.clone(),
872                    compactor_metrics.clone(),
873                    await_tree_config,
874                )
875                .await?;
876
877                StateStoreImpl::hummock(inner, storage_metrics)
878            }
879
880            "in_memory" | "in-memory" => {
881                tracing::warn!(
882                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
883                );
884                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
885            }
886
887            sled if sled.starts_with("sled://") => {
888                tracing::warn!(
889                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
890                );
891                let path = sled.strip_prefix("sled://").unwrap();
892                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
893            }
894
895            other => unimplemented!("{} state store is not supported", other),
896        };
897
898        Ok(store)
899    }
900}
901
902pub trait AsHummock: Send + Sync {
903    fn as_hummock(&self) -> Option<&HummockStorage>;
904
905    fn sync(
906        &self,
907        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
908    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
909        async move {
910            if let Some(hummock) = self.as_hummock() {
911                hummock.sync(sync_table_epochs).await
912            } else {
913                Ok(SyncResult::default())
914            }
915        }
916        .boxed()
917    }
918}
919
920impl AsHummock for HummockStorage {
921    fn as_hummock(&self) -> Option<&HummockStorage> {
922        Some(self)
923    }
924}
925
926impl AsHummock for MemoryStateStore {
927    fn as_hummock(&self) -> Option<&HummockStorage> {
928        None
929    }
930}
931
932impl AsHummock for SledStateStore {
933    fn as_hummock(&self) -> Option<&HummockStorage> {
934        None
935    }
936}
937
938#[cfg(debug_assertions)]
939mod dyn_state_store {
940    use std::future::Future;
941    use std::ops::DerefMut;
942    use std::sync::Arc;
943
944    use bytes::Bytes;
945    use risingwave_common::array::VectorRef;
946    use risingwave_common::bitmap::Bitmap;
947    use risingwave_common::hash::VirtualNode;
948    use risingwave_hummock_sdk::HummockReadEpoch;
949    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
950
951    use crate::error::StorageResult;
952    use crate::hummock::HummockStorage;
953    use crate::store::*;
954    use crate::store_impl::AsHummock;
955    use crate::vector::VectorDistance;
956
957    #[async_trait::async_trait]
958    pub trait DynStateStoreIter<T: IterItem>: Send {
959        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
960    }
961
962    #[async_trait::async_trait]
963    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
964        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
965            self.try_next().await
966        }
967    }
968
969    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
970    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
971        fn try_next(
972            &mut self,
973        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
974            self.deref_mut().try_next()
975        }
976    }
977
978    // For StateStoreRead
979
980    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
981    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
982
983    #[async_trait::async_trait]
984    pub trait DynStateStoreGet: StaticSendSync {
985        async fn get_keyed_row(
986            &self,
987            key: TableKey<Bytes>,
988            read_options: ReadOptions,
989        ) -> StorageResult<Option<StateStoreKeyedRow>>;
990    }
991
992    #[async_trait::async_trait]
993    pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
994        async fn iter(
995            &self,
996            key_range: TableKeyRange,
997
998            read_options: ReadOptions,
999        ) -> StorageResult<BoxStateStoreReadIter>;
1000
1001        async fn rev_iter(
1002            &self,
1003            key_range: TableKeyRange,
1004
1005            read_options: ReadOptions,
1006        ) -> StorageResult<BoxStateStoreReadIter>;
1007    }
1008
1009    #[async_trait::async_trait]
1010    pub trait DynStateStoreReadLog: StaticSendSync {
1011        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1012        async fn iter_log(
1013            &self,
1014            epoch_range: (u64, u64),
1015            key_range: TableKeyRange,
1016            options: ReadLogOptions,
1017        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1018    }
1019
1020    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1021
1022    #[async_trait::async_trait]
1023    impl<S: StateStoreGet> DynStateStoreGet for S {
1024        async fn get_keyed_row(
1025            &self,
1026            key: TableKey<Bytes>,
1027            read_options: ReadOptions,
1028        ) -> StorageResult<Option<StateStoreKeyedRow>> {
1029            self.on_key_value(key, read_options, move |key, value| {
1030                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1031            })
1032            .await
1033        }
1034    }
1035
1036    #[async_trait::async_trait]
1037    impl<S: StateStoreRead> DynStateStoreRead for S {
1038        async fn iter(
1039            &self,
1040            key_range: TableKeyRange,
1041
1042            read_options: ReadOptions,
1043        ) -> StorageResult<BoxStateStoreReadIter> {
1044            Ok(Box::new(self.iter(key_range, read_options).await?))
1045        }
1046
1047        async fn rev_iter(
1048            &self,
1049            key_range: TableKeyRange,
1050
1051            read_options: ReadOptions,
1052        ) -> StorageResult<BoxStateStoreReadIter> {
1053            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1054        }
1055    }
1056
1057    #[async_trait::async_trait]
1058    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1059        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1060            self.next_epoch(epoch, options).await
1061        }
1062
1063        async fn iter_log(
1064            &self,
1065            epoch_range: (u64, u64),
1066            key_range: TableKeyRange,
1067            options: ReadLogOptions,
1068        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1069            Ok(Box::new(
1070                self.iter_log(epoch_range, key_range, options).await?,
1071            ))
1072        }
1073    }
1074
1075    // For LocalStateStore
1076    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1077    #[async_trait::async_trait]
1078    pub trait DynLocalStateStore:
1079        DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1080    {
1081        async fn iter(
1082            &self,
1083            key_range: TableKeyRange,
1084            read_options: ReadOptions,
1085        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1086
1087        async fn rev_iter(
1088            &self,
1089            key_range: TableKeyRange,
1090            read_options: ReadOptions,
1091        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1092
1093        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1094
1095        fn insert(
1096            &mut self,
1097            key: TableKey<Bytes>,
1098            new_val: Bytes,
1099            old_val: Option<Bytes>,
1100        ) -> StorageResult<()>;
1101
1102        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1103
1104        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1105
1106        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1107    }
1108
1109    #[async_trait::async_trait]
1110    pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1111        async fn flush(&mut self) -> StorageResult<usize>;
1112
1113        async fn try_flush(&mut self) -> StorageResult<()>;
1114
1115        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1116
1117        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1118    }
1119
1120    #[async_trait::async_trait]
1121    impl<S: LocalStateStore> DynLocalStateStore for S {
1122        async fn iter(
1123            &self,
1124            key_range: TableKeyRange,
1125            read_options: ReadOptions,
1126        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1127            Ok(Box::new(self.iter(key_range, read_options).await?))
1128        }
1129
1130        async fn rev_iter(
1131            &self,
1132            key_range: TableKeyRange,
1133            read_options: ReadOptions,
1134        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1135            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1136        }
1137
1138        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1139            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1140        }
1141
1142        fn insert(
1143            &mut self,
1144            key: TableKey<Bytes>,
1145            new_val: Bytes,
1146            old_val: Option<Bytes>,
1147        ) -> StorageResult<()> {
1148            self.insert(key, new_val, old_val)
1149        }
1150
1151        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1152            self.delete(key, old_val)
1153        }
1154
1155        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1156            self.update_vnode_bitmap(vnodes).await
1157        }
1158
1159        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1160            self.get_table_watermark(vnode)
1161        }
1162    }
1163
1164    #[async_trait::async_trait]
1165    impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1166        async fn flush(&mut self) -> StorageResult<usize> {
1167            self.flush().await
1168        }
1169
1170        async fn try_flush(&mut self) -> StorageResult<()> {
1171            self.try_flush().await
1172        }
1173
1174        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1175            self.init(options).await
1176        }
1177
1178        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1179            self.seal_current_epoch(next_epoch, opts)
1180        }
1181    }
1182
1183    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1184
1185    impl LocalStateStore for BoxDynLocalStateStore {
1186        type FlushedSnapshotReader = StateStoreReadDynRef;
1187        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1188        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1189
1190        fn iter(
1191            &self,
1192            key_range: TableKeyRange,
1193            read_options: ReadOptions,
1194        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1195            (*self.0).iter(key_range, read_options)
1196        }
1197
1198        fn rev_iter(
1199            &self,
1200            key_range: TableKeyRange,
1201            read_options: ReadOptions,
1202        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1203            (*self.0).rev_iter(key_range, read_options)
1204        }
1205
1206        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1207            (*self.0).new_flushed_snapshot_reader()
1208        }
1209
1210        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1211            (*self.0).get_table_watermark(vnode)
1212        }
1213
1214        fn insert(
1215            &mut self,
1216            key: TableKey<Bytes>,
1217            new_val: Bytes,
1218            old_val: Option<Bytes>,
1219        ) -> StorageResult<()> {
1220            (*self.0).insert(key, new_val, old_val)
1221        }
1222
1223        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1224            (*self.0).delete(key, old_val)
1225        }
1226
1227        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1228            (*self.0).update_vnode_bitmap(vnodes).await
1229        }
1230    }
1231
1232    impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1233    where
1234        StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1235    {
1236        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1237            self.as_mut().flush()
1238        }
1239
1240        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1241            self.as_mut().try_flush()
1242        }
1243
1244        fn init(
1245            &mut self,
1246            options: InitOptions,
1247        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1248            self.as_mut().init(options)
1249        }
1250
1251        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1252            self.as_mut().seal_current_epoch(next_epoch, opts)
1253        }
1254    }
1255
1256    #[async_trait::async_trait]
1257    pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1258        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
1259    }
1260
1261    #[async_trait::async_trait]
1262    impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1263        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1264            self.insert(vec, info)
1265        }
1266    }
1267
1268    pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1269
1270    impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1271        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1272            self.0.insert(vec, info)
1273        }
1274    }
1275
1276    // For global StateStore
1277
1278    #[async_trait::async_trait]
1279    pub trait DynStateStoreReadVector: StaticSendSync {
1280        async fn nearest(
1281            &self,
1282            vec: VectorRef<'_>,
1283            options: VectorNearestOptions,
1284        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1285    }
1286
1287    #[async_trait::async_trait]
1288    impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1289        async fn nearest(
1290            &self,
1291            vec: VectorRef<'_>,
1292            options: VectorNearestOptions,
1293        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1294            use risingwave_common::types::ScalarRef;
1295            self.nearest(vec, options, |vec, distance, info| {
1296                (
1297                    vec.to_owned_scalar(),
1298                    distance,
1299                    Bytes::copy_from_slice(info),
1300                )
1301            })
1302            .await
1303        }
1304    }
1305
1306    impl<P> StateStoreReadVector for StateStorePointer<P>
1307    where
1308        StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1309    {
1310        async fn nearest<'a, O: Send + 'a>(
1311            &'a self,
1312            vec: VectorRef<'a>,
1313            options: VectorNearestOptions,
1314            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1315        ) -> StorageResult<Vec<O>> {
1316            let output = self.as_ref().nearest(vec, options).await?;
1317            Ok(output
1318                .into_iter()
1319                .map(|(vec, distance, info)| {
1320                    on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1321                })
1322                .collect())
1323        }
1324    }
1325
1326    pub trait DynStateStoreReadSnapshot:
1327        DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1328    {
1329    }
1330
1331    impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1332        for S
1333    {
1334    }
1335
1336    pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1337    #[async_trait::async_trait]
1338    pub trait DynStateStoreExt: StaticSendSync {
1339        async fn try_wait_epoch(
1340            &self,
1341            epoch: HummockReadEpoch,
1342            options: TryWaitEpochOptions,
1343        ) -> StorageResult<()>;
1344
1345        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1346        async fn new_read_snapshot(
1347            &self,
1348            epoch: HummockReadEpoch,
1349            options: NewReadSnapshotOptions,
1350        ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1351        async fn new_vector_writer(
1352            &self,
1353            options: NewVectorWriterOptions,
1354        ) -> BoxDynStateStoreWriteVector;
1355    }
1356
1357    #[async_trait::async_trait]
1358    impl<S: StateStore> DynStateStoreExt for S {
1359        async fn try_wait_epoch(
1360            &self,
1361            epoch: HummockReadEpoch,
1362            options: TryWaitEpochOptions,
1363        ) -> StorageResult<()> {
1364            self.try_wait_epoch(epoch, options).await
1365        }
1366
1367        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1368            StateStorePointer(Box::new(self.new_local(option).await))
1369        }
1370
1371        async fn new_read_snapshot(
1372            &self,
1373            epoch: HummockReadEpoch,
1374            options: NewReadSnapshotOptions,
1375        ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1376            Ok(StateStorePointer(Arc::new(
1377                self.new_read_snapshot(epoch, options).await?,
1378            )))
1379        }
1380
1381        async fn new_vector_writer(
1382            &self,
1383            options: NewVectorWriterOptions,
1384        ) -> BoxDynStateStoreWriteVector {
1385            StateStorePointer(Box::new(self.new_vector_writer(options).await))
1386        }
1387    }
1388
1389    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1390
1391    macro_rules! state_store_pointer_dyn_as_ref {
1392        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1393            impl AsRef<dyn $target_dyn_trait>
1394                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1395            {
1396                fn as_ref(&self) -> &dyn $target_dyn_trait {
1397                    (&*self.0) as _
1398                }
1399            }
1400        };
1401    }
1402
1403    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1404    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1405    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1406    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1407    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1408    state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1409
1410    macro_rules! state_store_pointer_dyn_as_mut {
1411        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1412            impl AsMut<dyn $target_dyn_trait>
1413                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1414            {
1415                fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1416                    (&mut *self.0) as _
1417                }
1418            }
1419        };
1420    }
1421
1422    state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1423    state_store_pointer_dyn_as_mut!(
1424        Box<dyn DynStateStoreWriteVector>,
1425        DynStateStoreWriteEpochControl
1426    );
1427
1428    #[derive(Clone)]
1429    pub struct StateStorePointer<P>(pub(crate) P);
1430
1431    impl<P> StateStoreGet for StateStorePointer<P>
1432    where
1433        StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1434    {
1435        async fn on_key_value<'a, O: Send + 'a>(
1436            &'a self,
1437            key: TableKey<Bytes>,
1438            read_options: ReadOptions,
1439            on_key_value_fn: impl KeyValueFn<'a, O>,
1440        ) -> StorageResult<Option<O>> {
1441            let option = self.as_ref().get_keyed_row(key, read_options).await?;
1442            option
1443                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1444                .transpose()
1445        }
1446    }
1447
1448    impl<P> StateStoreRead for StateStorePointer<P>
1449    where
1450        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1451    {
1452        type Iter = BoxStateStoreReadIter;
1453        type RevIter = BoxStateStoreReadIter;
1454
1455        fn iter(
1456            &self,
1457            key_range: TableKeyRange,
1458
1459            read_options: ReadOptions,
1460        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1461            self.as_ref().iter(key_range, read_options)
1462        }
1463
1464        fn rev_iter(
1465            &self,
1466            key_range: TableKeyRange,
1467
1468            read_options: ReadOptions,
1469        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1470            self.as_ref().rev_iter(key_range, read_options)
1471        }
1472    }
1473
1474    impl StateStoreReadLog for StateStoreDynRef {
1475        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1476
1477        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1478            (*self.0).next_epoch(epoch, options).await
1479        }
1480
1481        fn iter_log(
1482            &self,
1483            epoch_range: (u64, u64),
1484            key_range: TableKeyRange,
1485            options: ReadLogOptions,
1486        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1487            (*self.0).iter_log(epoch_range, key_range, options)
1488        }
1489    }
1490
1491    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1492
1493    impl AsHummock for StateStoreDynRef {
1494        fn as_hummock(&self) -> Option<&HummockStorage> {
1495            (*self.0).as_hummock()
1496        }
1497    }
1498
1499    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1500
1501    impl StateStore for StateStoreDynRef {
1502        type Local = BoxDynLocalStateStore;
1503        type ReadSnapshot = StateStoreReadSnapshotDynRef;
1504        type VectorWriter = BoxDynStateStoreWriteVector;
1505
1506        fn try_wait_epoch(
1507            &self,
1508            epoch: HummockReadEpoch,
1509            options: TryWaitEpochOptions,
1510        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1511            (*self.0).try_wait_epoch(epoch, options)
1512        }
1513
1514        fn new_local(
1515            &self,
1516            option: NewLocalOptions,
1517        ) -> impl Future<Output = Self::Local> + Send + '_ {
1518            (*self.0).new_local(option)
1519        }
1520
1521        async fn new_read_snapshot(
1522            &self,
1523            epoch: HummockReadEpoch,
1524            options: NewReadSnapshotOptions,
1525        ) -> StorageResult<Self::ReadSnapshot> {
1526            (*self.0).new_read_snapshot(epoch, options).await
1527        }
1528
1529        fn new_vector_writer(
1530            &self,
1531            options: NewVectorWriterOptions,
1532        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1533            (*self.0).new_vector_writer(options)
1534        }
1535    }
1536}