risingwave_storage/
store_impl.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22    BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23    HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45    Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46    SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52    ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57    Box::new(PrometheusMetricsRegistry::new(
58        GLOBAL_METRICS_REGISTRY.clone(),
59    ))
60});
61
62mod opaque_type {
63    use super::*;
64
65    pub type HummockStorageType = impl StateStore + AsHummock;
66    pub type MemoryStateStoreType = impl StateStore + AsHummock;
67    pub type SledStateStoreType = impl StateStore + AsHummock;
68
69    #[define_opaque(MemoryStateStoreType)]
70    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71        may_dynamic_dispatch(state_store)
72    }
73
74    #[define_opaque(HummockStorageType)]
75    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76        may_dynamic_dispatch(may_verify(state_store))
77    }
78
79    #[define_opaque(SledStateStoreType)]
80    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81        may_dynamic_dispatch(state_store)
82    }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94    state_store: S,
95    storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97    let inner = {
98        #[cfg(feature = "hm-trace")]
99        {
100            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101        }
102        #[cfg(not(feature = "hm-trace"))]
103        {
104            state_store
105        }
106    };
107    inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111    let inner = state_store.inner();
112    {
113        #[cfg(feature = "hm-trace")]
114        {
115            inner.inner()
116        }
117        #[cfg(not(feature = "hm-trace"))]
118        {
119            inner
120        }
121    }
122}
123
124/// The type erased [`StateStore`].
125#[derive(Clone, EnumAsInner)]
126#[expect(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
129    /// `hummock` will be automatically recognized as Hummock state store.
130    ///
131    /// Example URLs:
132    ///
133    /// * `hummock+s3://bucket`
134    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
135    /// * `hummock+memory` (should only be used in 1 compute node mode)
136    HummockStateStore(Monitored<HummockStorageType>),
137    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
138    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
139    /// store misses some critical implementation to ensure the correctness of persisting streaming
140    /// state. (e.g., no `read_epoch` support, no async checkpoint)
141    MemoryStateStore(Monitored<MemoryStateStoreType>),
142    SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146    #[cfg(not(debug_assertions))]
147    {
148        state_store
149    }
150    #[cfg(debug_assertions)]
151    {
152        use crate::store_impl::dyn_state_store::StateStorePointer;
153        StateStorePointer(Arc::new(state_store) as _)
154    }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158    #[cfg(not(debug_assertions))]
159    {
160        state_store
161    }
162    #[cfg(debug_assertions)]
163    {
164        use std::marker::PhantomData;
165
166        use risingwave_common::util::env_var::env_var_is_true;
167        use tracing::info;
168
169        use crate::store_impl::verify::VerifyStateStore;
170
171        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172            info!("enable verify state store");
173            Some(SledStateStore::new_temp())
174        } else {
175            info!("verify state store is not enabled");
176            None
177        };
178        VerifyStateStore {
179            actual: state_store,
180            expected,
181            _phantom: PhantomData::<()>,
182        }
183    }
184}
185
186impl StateStoreImpl {
187    fn in_memory(
188        state_store: MemoryStateStore,
189        storage_metrics: Arc<MonitoredStorageMetrics>,
190    ) -> Self {
191        // The specific type of MemoryStateStoreType in deducted here.
192        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193    }
194
195    pub fn hummock(
196        state_store: HummockStorage,
197        storage_metrics: Arc<MonitoredStorageMetrics>,
198    ) -> Self {
199        // The specific type of HummockStateStoreType in deducted here.
200        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201    }
202
203    pub fn sled(
204        state_store: SledStateStore,
205        storage_metrics: Arc<MonitoredStorageMetrics>,
206    ) -> Self {
207        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208    }
209
210    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212    }
213
214    pub fn for_test() -> Self {
215        Self::in_memory(
216            MemoryStateStore::new(),
217            Arc::new(MonitoredStorageMetrics::unused()),
218        )
219    }
220
221    pub fn as_hummock(&self) -> Option<&HummockStorage> {
222        match self {
223            StateStoreImpl::HummockStateStore(hummock) => {
224                Some(inner(hummock).as_hummock().expect("should be hummock"))
225            }
226            _ => None,
227        }
228    }
229}
230
231impl Debug for StateStoreImpl {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        match self {
234            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237        }
238    }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243    ($impl:expr, $store:ident, $body:tt) => {{
244        use $crate::store_impl::StateStoreImpl;
245
246        match $impl {
247            StateStoreImpl::MemoryStateStore($store) => {
248                // WARNING: don't change this. Enabling memory backend will cause monomorphization
249                // explosion and thus slow compile time in release mode.
250                #[cfg(debug_assertions)]
251                {
252                    $body
253                }
254                #[cfg(not(debug_assertions))]
255                {
256                    let _store = $store;
257                    unimplemented!("memory state store should never be used in release mode");
258                }
259            }
260
261            StateStoreImpl::SledStateStore($store) => {
262                // WARNING: don't change this. Enabling memory backend will cause monomorphization
263                // explosion and thus slow compile time in release mode.
264                #[cfg(debug_assertions)]
265                {
266                    $body
267                }
268                #[cfg(not(debug_assertions))]
269                {
270                    let _store = $store;
271                    unimplemented!("sled state store should never be used in release mode");
272                }
273            }
274
275            StateStoreImpl::HummockStateStore($store) => $body,
276        }
277    }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282    use std::fmt::Debug;
283    use std::future::Future;
284    use std::marker::PhantomData;
285    use std::ops::Deref;
286    use std::sync::Arc;
287
288    use bytes::Bytes;
289    use risingwave_common::array::VectorRef;
290    use risingwave_common::bitmap::Bitmap;
291    use risingwave_common::hash::VirtualNode;
292    use risingwave_hummock_sdk::HummockReadEpoch;
293    use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
294    use tracing::log::warn;
295
296    use crate::error::StorageResult;
297    use crate::hummock::HummockStorage;
298    use crate::store::*;
299    use crate::store_impl::AsHummock;
300
301    #[expect(dead_code)]
302    fn assert_result_eq<Item: PartialEq + Debug, E>(
303        first: &std::result::Result<Item, E>,
304        second: &std::result::Result<Item, E>,
305    ) {
306        match (first, second) {
307            (Ok(first), Ok(second)) => {
308                if first != second {
309                    warn!("result different: {:?} {:?}", first, second);
310                }
311                assert_eq!(first, second);
312            }
313            (Err(_), Err(_)) => {}
314            _ => {
315                warn!("one success and one failed");
316                panic!("result not equal");
317            }
318        }
319    }
320
321    #[derive(Clone)]
322    pub struct VerifyStateStore<A, E, T = ()> {
323        pub actual: A,
324        pub expected: Option<E>,
325        pub _phantom: PhantomData<T>,
326    }
327
328    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
329        fn as_hummock(&self) -> Option<&HummockStorage> {
330            self.actual.as_hummock()
331        }
332    }
333
334    impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
335        async fn on_key_value<'a, O: Send + 'a>(
336            &'a self,
337            key: TableKey<Bytes>,
338            read_options: ReadOptions,
339            on_key_value_fn: impl KeyValueFn<'a, O>,
340        ) -> StorageResult<Option<O>> {
341            let actual: Option<(FullKey<Bytes>, Bytes)> = self
342                .actual
343                .on_key_value(key.clone(), read_options.clone(), |key, value| {
344                    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345                })
346                .await?;
347            if let Some(expected) = &self.expected {
348                let expected: Option<(FullKey<Bytes>, Bytes)> = expected
349                    .on_key_value(key, read_options, |key, value| {
350                        Ok((key.copy_into(), Bytes::copy_from_slice(value)))
351                    })
352                    .await?;
353                assert_eq!(
354                    actual
355                        .as_ref()
356                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
357                    expected
358                        .as_ref()
359                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
360                );
361            }
362
363            actual
364                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
365                .transpose()
366        }
367    }
368
369    impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
370        for VerifyStateStore<A, E>
371    {
372        fn nearest<'a, O: Send + 'a>(
373            &'a self,
374            vec: VectorRef<'a>,
375            options: VectorNearestOptions,
376            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
377        ) -> impl StorageFuture<'a, Vec<O>> {
378            self.actual.nearest(vec, options, on_nearest_item_fn)
379        }
380    }
381
382    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
383        type Iter = impl StateStoreReadIter;
384        type RevIter = impl StateStoreReadIter;
385
386        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
387        // fail to compile.
388        #[expect(clippy::manual_async_fn)]
389        fn iter(
390            &self,
391            key_range: TableKeyRange,
392
393            read_options: ReadOptions,
394        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
395            async move {
396                let actual = self
397                    .actual
398                    .iter(key_range.clone(), read_options.clone())
399                    .await?;
400                let expected = if let Some(expected) = &self.expected {
401                    Some(expected.iter(key_range, read_options).await?)
402                } else {
403                    None
404                };
405
406                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
407            }
408        }
409
410        #[expect(clippy::manual_async_fn)]
411        fn rev_iter(
412            &self,
413            key_range: TableKeyRange,
414
415            read_options: ReadOptions,
416        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
417            async move {
418                let actual = self
419                    .actual
420                    .rev_iter(key_range.clone(), read_options.clone())
421                    .await?;
422                let expected = if let Some(expected) = &self.expected {
423                    Some(expected.rev_iter(key_range, read_options).await?)
424                } else {
425                    None
426                };
427
428                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
429            }
430        }
431    }
432
433    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
434        type ChangeLogIter = impl StateStoreReadChangeLogIter;
435
436        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
437            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
438            if let Some(expected) = &self.expected {
439                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
440            }
441            Ok(actual)
442        }
443
444        async fn iter_log(
445            &self,
446            epoch_range: (u64, u64),
447            key_range: TableKeyRange,
448            options: ReadLogOptions,
449        ) -> StorageResult<Self::ChangeLogIter> {
450            let actual = self
451                .actual
452                .iter_log(epoch_range, key_range.clone(), options.clone())
453                .await?;
454            let expected = if let Some(expected) = &self.expected {
455                Some(expected.iter_log(epoch_range, key_range, options).await?)
456            } else {
457                None
458            };
459
460            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
461        }
462    }
463
464    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
465        for VerifyStateStore<A, E, T>
466    where
467        for<'a> T::ItemRef<'a>: PartialEq + Debug,
468    {
469        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
470            let actual = self.actual.try_next().await?;
471            if let Some(expected) = self.expected.as_mut() {
472                let expected = expected.try_next().await?;
473                assert_eq!(actual, expected);
474            }
475            Ok(actual)
476        }
477    }
478
479    fn verify_iter<T: IterItem>(
480        actual: impl StateStoreIter<T>,
481        expected: Option<impl StateStoreIter<T>>,
482    ) -> impl StateStoreIter<T>
483    where
484        for<'a> T::ItemRef<'a>: PartialEq + Debug,
485    {
486        VerifyStateStore {
487            actual,
488            expected,
489            _phantom: PhantomData::<T>,
490        }
491    }
492
493    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
494        type FlushedSnapshotReader =
495            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
496
497        type Iter<'a> = impl StateStoreIter + 'a;
498        type RevIter<'a> = impl StateStoreIter + 'a;
499
500        #[expect(clippy::manual_async_fn)]
501        fn iter(
502            &self,
503            key_range: TableKeyRange,
504            read_options: ReadOptions,
505        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
506            async move {
507                let actual = self
508                    .actual
509                    .iter(key_range.clone(), read_options.clone())
510                    .await?;
511                let expected = if let Some(expected) = &self.expected {
512                    Some(expected.iter(key_range, read_options).await?)
513                } else {
514                    None
515                };
516
517                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
518            }
519        }
520
521        #[expect(clippy::manual_async_fn)]
522        fn rev_iter(
523            &self,
524            key_range: TableKeyRange,
525            read_options: ReadOptions,
526        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
527            async move {
528                let actual = self
529                    .actual
530                    .rev_iter(key_range.clone(), read_options.clone())
531                    .await?;
532                let expected = if let Some(expected) = &self.expected {
533                    Some(expected.rev_iter(key_range, read_options).await?)
534                } else {
535                    None
536                };
537
538                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
539            }
540        }
541
542        fn insert(
543            &mut self,
544            key: TableKey<Bytes>,
545            new_val: Bytes,
546            old_val: Option<Bytes>,
547        ) -> StorageResult<()> {
548            if let Some(expected) = &mut self.expected {
549                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
550            }
551            self.actual.insert(key, new_val, old_val)?;
552
553            Ok(())
554        }
555
556        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
557            if let Some(expected) = &mut self.expected {
558                expected.delete(key.clone(), old_val.clone())?;
559            }
560            self.actual.delete(key, old_val)?;
561            Ok(())
562        }
563
564        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
565            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
566            if let Some(expected) = &mut self.expected {
567                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
568            }
569            Ok(ret)
570        }
571
572        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
573            let ret = self.actual.get_table_watermark(vnode);
574            if let Some(expected) = &self.expected {
575                assert_eq!(ret, expected.get_table_watermark(vnode));
576            }
577            ret
578        }
579
580        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
581            VerifyStateStore {
582                actual: self.actual.new_flushed_snapshot_reader(),
583                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
584                _phantom: Default::default(),
585            }
586        }
587    }
588
589    impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
590        for VerifyStateStore<A, E>
591    {
592        async fn flush(&mut self) -> StorageResult<usize> {
593            if let Some(expected) = &mut self.expected {
594                expected.flush().await?;
595            }
596            self.actual.flush().await
597        }
598
599        async fn try_flush(&mut self) -> StorageResult<()> {
600            if let Some(expected) = &mut self.expected {
601                expected.try_flush().await?;
602            }
603            self.actual.try_flush().await
604        }
605
606        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
607            self.actual.init(options.clone()).await?;
608            if let Some(expected) = &mut self.expected {
609                expected.init(options).await?;
610            }
611            Ok(())
612        }
613
614        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
615            if let Some(expected) = &mut self.expected {
616                expected.seal_current_epoch(next_epoch, opts.clone());
617            }
618            self.actual.seal_current_epoch(next_epoch, opts);
619        }
620    }
621
622    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
623        type Local = VerifyStateStore<A::Local, E::Local>;
624        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
625        type VectorWriter = A::VectorWriter;
626
627        fn try_wait_epoch(
628            &self,
629            epoch: HummockReadEpoch,
630            options: TryWaitEpochOptions,
631        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
632            self.actual.try_wait_epoch(epoch, options)
633        }
634
635        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
636            let expected = if let Some(expected) = &self.expected {
637                Some(expected.new_local(option.clone()).await)
638            } else {
639                None
640            };
641            VerifyStateStore {
642                actual: self.actual.new_local(option).await,
643                expected,
644                _phantom: PhantomData::<()>,
645            }
646        }
647
648        async fn new_read_snapshot(
649            &self,
650            epoch: HummockReadEpoch,
651            options: NewReadSnapshotOptions,
652        ) -> StorageResult<Self::ReadSnapshot> {
653            let expected = if let Some(expected) = &self.expected {
654                Some(expected.new_read_snapshot(epoch, options).await?)
655            } else {
656                None
657            };
658            Ok(VerifyStateStore {
659                actual: self.actual.new_read_snapshot(epoch, options).await?,
660                expected,
661                _phantom: PhantomData::<()>,
662            })
663        }
664
665        fn new_vector_writer(
666            &self,
667            options: NewVectorWriterOptions,
668        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
669            self.actual.new_vector_writer(options)
670        }
671    }
672
673    impl<A, E> Deref for VerifyStateStore<A, E> {
674        type Target = A;
675
676        fn deref(&self) -> &Self::Target {
677            &self.actual
678        }
679    }
680}
681
682impl StateStoreImpl {
683    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
684    #[expect(clippy::borrowed_box)]
685    pub async fn new(
686        s: &str,
687        opts: Arc<StorageOpts>,
688        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
689        state_store_metrics: Arc<HummockStateStoreMetrics>,
690        object_store_metrics: Arc<ObjectStoreMetrics>,
691        storage_metrics: Arc<MonitoredStorageMetrics>,
692        compactor_metrics: Arc<CompactorMetrics>,
693        await_tree_config: Option<await_tree::Config>,
694        use_new_object_prefix_strategy: bool,
695    ) -> StorageResult<Self> {
696        const KB: usize = 1 << 10;
697        const MB: usize = 1 << 20;
698
699        let meta_cache = {
700            let mut builder = HybridCacheBuilder::new()
701                .with_name("foyer.meta")
702                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
703                .memory(opts.meta_cache_capacity_mb * MB)
704                .with_shards(opts.meta_cache_shard_num)
705                .with_eviction_config(opts.meta_cache_eviction_config.clone())
706                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
707                    u64::BITS as usize / 8 + value.estimate_size()
708                })
709                .storage();
710
711            if !opts.meta_file_cache_dir.is_empty() {
712                if let Err(e) = Feature::ElasticDiskCache.check_available() {
713                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
714                } else {
715                    let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
716                        .with_capacity(opts.meta_file_cache_capacity_mb * MB)
717                        .with_throttle(opts.meta_file_cache_throttle.clone())
718                        .build()
719                        .map_err(HummockError::foyer_error)?;
720                    let engine_builder = BlockEngineBuilder::new(device)
721                        .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
722                        .with_indexer_shards(opts.meta_file_cache_indexer_shards)
723                        .with_flushers(opts.meta_file_cache_flushers)
724                        .with_reclaimers(opts.meta_file_cache_reclaimers)
725                        .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
726                        .with_clean_block_threshold(
727                            opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
728                        )
729                        .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
730                        .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
731                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
732                            opts.meta_file_cache_fifo_probation_ratio,
733                        ))]);
734                    builder = builder
735                        .with_engine_config(engine_builder)
736                        .with_recover_mode(opts.meta_file_cache_recover_mode)
737                        .with_compression(opts.meta_file_cache_compression)
738                        .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
739                }
740            }
741
742            builder.build().await.map_err(HummockError::foyer_error)?
743        };
744
745        let block_cache = {
746            let mut builder = HybridCacheBuilder::new()
747                .with_name("foyer.data")
748                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
749                .with_event_listener(Arc::new(BlockCacheEventListener::new(
750                    state_store_metrics.clone(),
751                )))
752                .memory(opts.block_cache_capacity_mb * MB)
753                .with_shards(opts.block_cache_shard_num)
754                .with_eviction_config(opts.block_cache_eviction_config.clone())
755                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
756                    // FIXME(MrCroxx): Calculate block weight more accurately.
757                    u64::BITS as usize * 2 / 8 + value.raw().len()
758                })
759                .storage();
760
761            if !opts.data_file_cache_dir.is_empty() {
762                if let Err(e) = Feature::ElasticDiskCache.check_available() {
763                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
764                } else {
765                    let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
766                        .with_capacity(opts.data_file_cache_capacity_mb * MB)
767                        .with_throttle(opts.data_file_cache_throttle.clone())
768                        .build()
769                        .map_err(HummockError::foyer_error)?;
770                    let engine_builder = BlockEngineBuilder::new(device)
771                        .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
772                        .with_indexer_shards(opts.data_file_cache_indexer_shards)
773                        .with_flushers(opts.data_file_cache_flushers)
774                        .with_reclaimers(opts.data_file_cache_reclaimers)
775                        .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
776                        .with_clean_block_threshold(
777                            opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
778                        )
779                        .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
780                        .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
781                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
782                            opts.data_file_cache_fifo_probation_ratio,
783                        ))]);
784                    builder = builder
785                        .with_engine_config(engine_builder)
786                        .with_recover_mode(opts.data_file_cache_recover_mode)
787                        .with_compression(opts.data_file_cache_compression)
788                        .with_runtime_options(opts.data_file_cache_runtime_config.clone());
789                }
790            }
791
792            builder.build().await.map_err(HummockError::foyer_error)?
793        };
794
795        let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
796            .with_shards(opts.vector_meta_cache_shard_num)
797            .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
798            .build();
799
800        let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
801            .with_shards(opts.vector_block_cache_shard_num)
802            .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
803            .build();
804
805        let recent_filter = if opts.data_file_cache_dir.is_empty() {
806            Arc::new(NoneRecentFilter::default().into())
807        } else if opts.cache_refill_recent_filter_shards == 1 {
808            Arc::new(
809                SimpleRecentFilter::new(
810                    opts.cache_refill_recent_filter_layers,
811                    Duration::from_millis(
812                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
813                    ),
814                )
815                .into(),
816            )
817        } else if opts.cache_refill_skip_recent_filter {
818            Arc::new(AllRecentFilter::default().into())
819        } else {
820            Arc::new(
821                ShardedRecentFilter::new(
822                    opts.cache_refill_recent_filter_layers,
823                    Duration::from_millis(
824                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
825                    ),
826                    opts.cache_refill_recent_filter_shards,
827                )
828                .into(),
829            )
830        };
831
832        let store = match s {
833            hummock if hummock.starts_with("hummock+") => {
834                let object_store = build_remote_object_store(
835                    hummock.strip_prefix("hummock+").unwrap(),
836                    object_store_metrics.clone(),
837                    "Hummock",
838                    Arc::new(opts.object_store_config.clone()),
839                )
840                .await;
841
842                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
843                    store: Arc::new(object_store),
844                    path: opts.data_directory.clone(),
845                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
846                    max_prefetch_block_number: opts.max_prefetch_block_number,
847                    recent_filter,
848                    state_store_metrics: state_store_metrics.clone(),
849                    use_new_object_prefix_strategy,
850                    skip_bloom_filter_in_serde: opts.sst_skip_bloom_filter_in_serde,
851
852                    meta_cache,
853                    block_cache,
854                    vector_meta_cache,
855                    vector_block_cache,
856                }));
857                let notification_client =
858                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
859                let compaction_catalog_manager_ref =
860                    Arc::new(CompactionCatalogManager::new(Box::new(
861                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
862                    )));
863
864                let inner = HummockStorage::new(
865                    opts.clone(),
866                    sstable_store,
867                    hummock_meta_client.clone(),
868                    notification_client,
869                    compaction_catalog_manager_ref,
870                    state_store_metrics.clone(),
871                    compactor_metrics.clone(),
872                    await_tree_config,
873                )
874                .await?;
875
876                StateStoreImpl::hummock(inner, storage_metrics)
877            }
878
879            "in_memory" | "in-memory" => {
880                tracing::warn!(
881                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
882                );
883                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
884            }
885
886            sled if sled.starts_with("sled://") => {
887                tracing::warn!(
888                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
889                );
890                let path = sled.strip_prefix("sled://").unwrap();
891                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
892            }
893
894            other => unimplemented!("{} state store is not supported", other),
895        };
896
897        Ok(store)
898    }
899}
900
901pub trait AsHummock: Send + Sync {
902    fn as_hummock(&self) -> Option<&HummockStorage>;
903
904    fn sync(
905        &self,
906        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
907    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
908        async move {
909            if let Some(hummock) = self.as_hummock() {
910                hummock.sync(sync_table_epochs).await
911            } else {
912                Ok(SyncResult::default())
913            }
914        }
915        .boxed()
916    }
917}
918
919impl AsHummock for HummockStorage {
920    fn as_hummock(&self) -> Option<&HummockStorage> {
921        Some(self)
922    }
923}
924
925impl AsHummock for MemoryStateStore {
926    fn as_hummock(&self) -> Option<&HummockStorage> {
927        None
928    }
929}
930
931impl AsHummock for SledStateStore {
932    fn as_hummock(&self) -> Option<&HummockStorage> {
933        None
934    }
935}
936
937#[cfg(debug_assertions)]
938mod dyn_state_store {
939    use std::future::Future;
940    use std::ops::DerefMut;
941    use std::sync::Arc;
942
943    use bytes::Bytes;
944    use risingwave_common::array::VectorRef;
945    use risingwave_common::bitmap::Bitmap;
946    use risingwave_common::hash::VirtualNode;
947    use risingwave_hummock_sdk::HummockReadEpoch;
948    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
949
950    use crate::error::StorageResult;
951    use crate::hummock::HummockStorage;
952    use crate::store::*;
953    use crate::store_impl::AsHummock;
954    use crate::vector::VectorDistance;
955
956    #[async_trait::async_trait]
957    pub trait DynStateStoreIter<T: IterItem>: Send {
958        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
959    }
960
961    #[async_trait::async_trait]
962    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
963        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
964            self.try_next().await
965        }
966    }
967
968    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
969    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
970        fn try_next(
971            &mut self,
972        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
973            self.deref_mut().try_next()
974        }
975    }
976
977    // For StateStoreRead
978
979    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
980    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
981
982    #[async_trait::async_trait]
983    pub trait DynStateStoreGet: StaticSendSync {
984        async fn get_keyed_row(
985            &self,
986            key: TableKey<Bytes>,
987            read_options: ReadOptions,
988        ) -> StorageResult<Option<StateStoreKeyedRow>>;
989    }
990
991    #[async_trait::async_trait]
992    pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
993        async fn iter(
994            &self,
995            key_range: TableKeyRange,
996
997            read_options: ReadOptions,
998        ) -> StorageResult<BoxStateStoreReadIter>;
999
1000        async fn rev_iter(
1001            &self,
1002            key_range: TableKeyRange,
1003
1004            read_options: ReadOptions,
1005        ) -> StorageResult<BoxStateStoreReadIter>;
1006    }
1007
1008    #[async_trait::async_trait]
1009    pub trait DynStateStoreReadLog: StaticSendSync {
1010        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1011        async fn iter_log(
1012            &self,
1013            epoch_range: (u64, u64),
1014            key_range: TableKeyRange,
1015            options: ReadLogOptions,
1016        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1017    }
1018
1019    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1020
1021    #[async_trait::async_trait]
1022    impl<S: StateStoreGet> DynStateStoreGet for S {
1023        async fn get_keyed_row(
1024            &self,
1025            key: TableKey<Bytes>,
1026            read_options: ReadOptions,
1027        ) -> StorageResult<Option<StateStoreKeyedRow>> {
1028            self.on_key_value(key, read_options, move |key, value| {
1029                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1030            })
1031            .await
1032        }
1033    }
1034
1035    #[async_trait::async_trait]
1036    impl<S: StateStoreRead> DynStateStoreRead for S {
1037        async fn iter(
1038            &self,
1039            key_range: TableKeyRange,
1040
1041            read_options: ReadOptions,
1042        ) -> StorageResult<BoxStateStoreReadIter> {
1043            Ok(Box::new(self.iter(key_range, read_options).await?))
1044        }
1045
1046        async fn rev_iter(
1047            &self,
1048            key_range: TableKeyRange,
1049
1050            read_options: ReadOptions,
1051        ) -> StorageResult<BoxStateStoreReadIter> {
1052            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1053        }
1054    }
1055
1056    #[async_trait::async_trait]
1057    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1058        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1059            self.next_epoch(epoch, options).await
1060        }
1061
1062        async fn iter_log(
1063            &self,
1064            epoch_range: (u64, u64),
1065            key_range: TableKeyRange,
1066            options: ReadLogOptions,
1067        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1068            Ok(Box::new(
1069                self.iter_log(epoch_range, key_range, options).await?,
1070            ))
1071        }
1072    }
1073
1074    // For LocalStateStore
1075    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1076    #[async_trait::async_trait]
1077    pub trait DynLocalStateStore:
1078        DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1079    {
1080        async fn iter(
1081            &self,
1082            key_range: TableKeyRange,
1083            read_options: ReadOptions,
1084        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1085
1086        async fn rev_iter(
1087            &self,
1088            key_range: TableKeyRange,
1089            read_options: ReadOptions,
1090        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1091
1092        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1093
1094        fn insert(
1095            &mut self,
1096            key: TableKey<Bytes>,
1097            new_val: Bytes,
1098            old_val: Option<Bytes>,
1099        ) -> StorageResult<()>;
1100
1101        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1102
1103        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1104
1105        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1106    }
1107
1108    #[async_trait::async_trait]
1109    pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1110        async fn flush(&mut self) -> StorageResult<usize>;
1111
1112        async fn try_flush(&mut self) -> StorageResult<()>;
1113
1114        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1115
1116        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1117    }
1118
1119    #[async_trait::async_trait]
1120    impl<S: LocalStateStore> DynLocalStateStore for S {
1121        async fn iter(
1122            &self,
1123            key_range: TableKeyRange,
1124            read_options: ReadOptions,
1125        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1126            Ok(Box::new(self.iter(key_range, read_options).await?))
1127        }
1128
1129        async fn rev_iter(
1130            &self,
1131            key_range: TableKeyRange,
1132            read_options: ReadOptions,
1133        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1134            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1135        }
1136
1137        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1138            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1139        }
1140
1141        fn insert(
1142            &mut self,
1143            key: TableKey<Bytes>,
1144            new_val: Bytes,
1145            old_val: Option<Bytes>,
1146        ) -> StorageResult<()> {
1147            self.insert(key, new_val, old_val)
1148        }
1149
1150        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1151            self.delete(key, old_val)
1152        }
1153
1154        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1155            self.update_vnode_bitmap(vnodes).await
1156        }
1157
1158        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1159            self.get_table_watermark(vnode)
1160        }
1161    }
1162
1163    #[async_trait::async_trait]
1164    impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1165        async fn flush(&mut self) -> StorageResult<usize> {
1166            self.flush().await
1167        }
1168
1169        async fn try_flush(&mut self) -> StorageResult<()> {
1170            self.try_flush().await
1171        }
1172
1173        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1174            self.init(options).await
1175        }
1176
1177        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1178            self.seal_current_epoch(next_epoch, opts)
1179        }
1180    }
1181
1182    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1183
1184    impl LocalStateStore for BoxDynLocalStateStore {
1185        type FlushedSnapshotReader = StateStoreReadDynRef;
1186        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1187        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1188
1189        fn iter(
1190            &self,
1191            key_range: TableKeyRange,
1192            read_options: ReadOptions,
1193        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1194            (*self.0).iter(key_range, read_options)
1195        }
1196
1197        fn rev_iter(
1198            &self,
1199            key_range: TableKeyRange,
1200            read_options: ReadOptions,
1201        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1202            (*self.0).rev_iter(key_range, read_options)
1203        }
1204
1205        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1206            (*self.0).new_flushed_snapshot_reader()
1207        }
1208
1209        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1210            (*self.0).get_table_watermark(vnode)
1211        }
1212
1213        fn insert(
1214            &mut self,
1215            key: TableKey<Bytes>,
1216            new_val: Bytes,
1217            old_val: Option<Bytes>,
1218        ) -> StorageResult<()> {
1219            (*self.0).insert(key, new_val, old_val)
1220        }
1221
1222        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1223            (*self.0).delete(key, old_val)
1224        }
1225
1226        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1227            (*self.0).update_vnode_bitmap(vnodes).await
1228        }
1229    }
1230
1231    impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1232    where
1233        StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1234    {
1235        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1236            self.as_mut().flush()
1237        }
1238
1239        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1240            self.as_mut().try_flush()
1241        }
1242
1243        fn init(
1244            &mut self,
1245            options: InitOptions,
1246        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1247            self.as_mut().init(options)
1248        }
1249
1250        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1251            self.as_mut().seal_current_epoch(next_epoch, opts)
1252        }
1253    }
1254
1255    #[async_trait::async_trait]
1256    pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1257        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
1258    }
1259
1260    #[async_trait::async_trait]
1261    impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1262        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1263            self.insert(vec, info)
1264        }
1265    }
1266
1267    pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1268
1269    impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1270        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1271            self.0.insert(vec, info)
1272        }
1273    }
1274
1275    // For global StateStore
1276
1277    #[async_trait::async_trait]
1278    pub trait DynStateStoreReadVector: StaticSendSync {
1279        async fn nearest(
1280            &self,
1281            vec: VectorRef<'_>,
1282            options: VectorNearestOptions,
1283        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1284    }
1285
1286    #[async_trait::async_trait]
1287    impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1288        async fn nearest(
1289            &self,
1290            vec: VectorRef<'_>,
1291            options: VectorNearestOptions,
1292        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1293            use risingwave_common::types::ScalarRef;
1294            self.nearest(vec, options, |vec, distance, info| {
1295                (
1296                    vec.to_owned_scalar(),
1297                    distance,
1298                    Bytes::copy_from_slice(info),
1299                )
1300            })
1301            .await
1302        }
1303    }
1304
1305    impl<P> StateStoreReadVector for StateStorePointer<P>
1306    where
1307        StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1308    {
1309        async fn nearest<'a, O: Send + 'a>(
1310            &'a self,
1311            vec: VectorRef<'a>,
1312            options: VectorNearestOptions,
1313            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1314        ) -> StorageResult<Vec<O>> {
1315            let output = self.as_ref().nearest(vec, options).await?;
1316            Ok(output
1317                .into_iter()
1318                .map(|(vec, distance, info)| {
1319                    on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1320                })
1321                .collect())
1322        }
1323    }
1324
1325    pub trait DynStateStoreReadSnapshot:
1326        DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1327    {
1328    }
1329
1330    impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1331        for S
1332    {
1333    }
1334
1335    pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1336    #[async_trait::async_trait]
1337    pub trait DynStateStoreExt: StaticSendSync {
1338        async fn try_wait_epoch(
1339            &self,
1340            epoch: HummockReadEpoch,
1341            options: TryWaitEpochOptions,
1342        ) -> StorageResult<()>;
1343
1344        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1345        async fn new_read_snapshot(
1346            &self,
1347            epoch: HummockReadEpoch,
1348            options: NewReadSnapshotOptions,
1349        ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1350        async fn new_vector_writer(
1351            &self,
1352            options: NewVectorWriterOptions,
1353        ) -> BoxDynStateStoreWriteVector;
1354    }
1355
1356    #[async_trait::async_trait]
1357    impl<S: StateStore> DynStateStoreExt for S {
1358        async fn try_wait_epoch(
1359            &self,
1360            epoch: HummockReadEpoch,
1361            options: TryWaitEpochOptions,
1362        ) -> StorageResult<()> {
1363            self.try_wait_epoch(epoch, options).await
1364        }
1365
1366        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1367            StateStorePointer(Box::new(self.new_local(option).await))
1368        }
1369
1370        async fn new_read_snapshot(
1371            &self,
1372            epoch: HummockReadEpoch,
1373            options: NewReadSnapshotOptions,
1374        ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1375            Ok(StateStorePointer(Arc::new(
1376                self.new_read_snapshot(epoch, options).await?,
1377            )))
1378        }
1379
1380        async fn new_vector_writer(
1381            &self,
1382            options: NewVectorWriterOptions,
1383        ) -> BoxDynStateStoreWriteVector {
1384            StateStorePointer(Box::new(self.new_vector_writer(options).await))
1385        }
1386    }
1387
1388    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1389
1390    macro_rules! state_store_pointer_dyn_as_ref {
1391        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1392            impl AsRef<dyn $target_dyn_trait>
1393                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1394            {
1395                fn as_ref(&self) -> &dyn $target_dyn_trait {
1396                    (&*self.0) as _
1397                }
1398            }
1399        };
1400    }
1401
1402    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1403    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1404    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1405    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1406    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1407    state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1408
1409    macro_rules! state_store_pointer_dyn_as_mut {
1410        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1411            impl AsMut<dyn $target_dyn_trait>
1412                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1413            {
1414                fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1415                    (&mut *self.0) as _
1416                }
1417            }
1418        };
1419    }
1420
1421    state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1422    state_store_pointer_dyn_as_mut!(
1423        Box<dyn DynStateStoreWriteVector>,
1424        DynStateStoreWriteEpochControl
1425    );
1426
1427    #[derive(Clone)]
1428    pub struct StateStorePointer<P>(pub(crate) P);
1429
1430    impl<P> StateStoreGet for StateStorePointer<P>
1431    where
1432        StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1433    {
1434        async fn on_key_value<'a, O: Send + 'a>(
1435            &'a self,
1436            key: TableKey<Bytes>,
1437            read_options: ReadOptions,
1438            on_key_value_fn: impl KeyValueFn<'a, O>,
1439        ) -> StorageResult<Option<O>> {
1440            let option = self.as_ref().get_keyed_row(key, read_options).await?;
1441            option
1442                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1443                .transpose()
1444        }
1445    }
1446
1447    impl<P> StateStoreRead for StateStorePointer<P>
1448    where
1449        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1450    {
1451        type Iter = BoxStateStoreReadIter;
1452        type RevIter = BoxStateStoreReadIter;
1453
1454        fn iter(
1455            &self,
1456            key_range: TableKeyRange,
1457
1458            read_options: ReadOptions,
1459        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1460            self.as_ref().iter(key_range, read_options)
1461        }
1462
1463        fn rev_iter(
1464            &self,
1465            key_range: TableKeyRange,
1466
1467            read_options: ReadOptions,
1468        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1469            self.as_ref().rev_iter(key_range, read_options)
1470        }
1471    }
1472
1473    impl StateStoreReadLog for StateStoreDynRef {
1474        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1475
1476        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1477            (*self.0).next_epoch(epoch, options).await
1478        }
1479
1480        fn iter_log(
1481            &self,
1482            epoch_range: (u64, u64),
1483            key_range: TableKeyRange,
1484            options: ReadLogOptions,
1485        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1486            (*self.0).iter_log(epoch_range, key_range, options)
1487        }
1488    }
1489
1490    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1491
1492    impl AsHummock for StateStoreDynRef {
1493        fn as_hummock(&self) -> Option<&HummockStorage> {
1494            (*self.0).as_hummock()
1495        }
1496    }
1497
1498    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1499
1500    impl StateStore for StateStoreDynRef {
1501        type Local = BoxDynLocalStateStore;
1502        type ReadSnapshot = StateStoreReadSnapshotDynRef;
1503        type VectorWriter = BoxDynStateStoreWriteVector;
1504
1505        fn try_wait_epoch(
1506            &self,
1507            epoch: HummockReadEpoch,
1508            options: TryWaitEpochOptions,
1509        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1510            (*self.0).try_wait_epoch(epoch, options)
1511        }
1512
1513        fn new_local(
1514            &self,
1515            option: NewLocalOptions,
1516        ) -> impl Future<Output = Self::Local> + Send + '_ {
1517            (*self.0).new_local(option)
1518        }
1519
1520        async fn new_read_snapshot(
1521            &self,
1522            epoch: HummockReadEpoch,
1523            options: NewReadSnapshotOptions,
1524        ) -> StorageResult<Self::ReadSnapshot> {
1525            (*self.0).new_read_snapshot(epoch, options).await
1526        }
1527
1528        fn new_vector_writer(
1529            &self,
1530            options: NewVectorWriterOptions,
1531        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1532            (*self.0).new_vector_writer(options)
1533        }
1534    }
1535}