risingwave_storage/
store_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22    BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23    HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45    Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46    SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52    ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57    Box::new(PrometheusMetricsRegistry::new(
58        GLOBAL_METRICS_REGISTRY.clone(),
59    ))
60});
61
62mod opaque_type {
63    use super::*;
64
65    pub type HummockStorageType = impl StateStore + AsHummock;
66    pub type MemoryStateStoreType = impl StateStore + AsHummock;
67    pub type SledStateStoreType = impl StateStore + AsHummock;
68
69    #[define_opaque(MemoryStateStoreType)]
70    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71        may_dynamic_dispatch(state_store)
72    }
73
74    #[define_opaque(HummockStorageType)]
75    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76        may_dynamic_dispatch(may_verify(state_store))
77    }
78
79    #[define_opaque(SledStateStoreType)]
80    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81        may_dynamic_dispatch(state_store)
82    }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94    state_store: S,
95    storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97    let inner = {
98        #[cfg(feature = "hm-trace")]
99        {
100            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101        }
102        #[cfg(not(feature = "hm-trace"))]
103        {
104            state_store
105        }
106    };
107    inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111    let inner = state_store.inner();
112    {
113        #[cfg(feature = "hm-trace")]
114        {
115            inner.inner()
116        }
117        #[cfg(not(feature = "hm-trace"))]
118        {
119            inner
120        }
121    }
122}
123
124/// The type erased [`StateStore`].
125#[derive(Clone, EnumAsInner)]
126#[allow(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
129    /// `hummock` will be automatically recognized as Hummock state store.
130    ///
131    /// Example URLs:
132    ///
133    /// * `hummock+s3://bucket`
134    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
135    /// * `hummock+memory` (should only be used in 1 compute node mode)
136    HummockStateStore(Monitored<HummockStorageType>),
137    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
138    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
139    /// store misses some critical implementation to ensure the correctness of persisting streaming
140    /// state. (e.g., no `read_epoch` support, no async checkpoint)
141    MemoryStateStore(Monitored<MemoryStateStoreType>),
142    SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146    #[cfg(not(debug_assertions))]
147    {
148        state_store
149    }
150    #[cfg(debug_assertions)]
151    {
152        use crate::store_impl::dyn_state_store::StateStorePointer;
153        StateStorePointer(Arc::new(state_store) as _)
154    }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158    #[cfg(not(debug_assertions))]
159    {
160        state_store
161    }
162    #[cfg(debug_assertions)]
163    {
164        use std::marker::PhantomData;
165
166        use risingwave_common::util::env_var::env_var_is_true;
167        use tracing::info;
168
169        use crate::store_impl::verify::VerifyStateStore;
170
171        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172            info!("enable verify state store");
173            Some(SledStateStore::new_temp())
174        } else {
175            info!("verify state store is not enabled");
176            None
177        };
178        VerifyStateStore {
179            actual: state_store,
180            expected,
181            _phantom: PhantomData::<()>,
182        }
183    }
184}
185
186impl StateStoreImpl {
187    fn in_memory(
188        state_store: MemoryStateStore,
189        storage_metrics: Arc<MonitoredStorageMetrics>,
190    ) -> Self {
191        // The specific type of MemoryStateStoreType in deducted here.
192        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193    }
194
195    pub fn hummock(
196        state_store: HummockStorage,
197        storage_metrics: Arc<MonitoredStorageMetrics>,
198    ) -> Self {
199        // The specific type of HummockStateStoreType in deducted here.
200        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201    }
202
203    pub fn sled(
204        state_store: SledStateStore,
205        storage_metrics: Arc<MonitoredStorageMetrics>,
206    ) -> Self {
207        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208    }
209
210    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212    }
213
214    pub fn for_test() -> Self {
215        Self::in_memory(
216            MemoryStateStore::new(),
217            Arc::new(MonitoredStorageMetrics::unused()),
218        )
219    }
220
221    pub fn as_hummock(&self) -> Option<&HummockStorage> {
222        match self {
223            StateStoreImpl::HummockStateStore(hummock) => {
224                Some(inner(hummock).as_hummock().expect("should be hummock"))
225            }
226            _ => None,
227        }
228    }
229}
230
231impl Debug for StateStoreImpl {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        match self {
234            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237        }
238    }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243    ($impl:expr, $store:ident, $body:tt) => {{
244        use $crate::store_impl::StateStoreImpl;
245
246        match $impl {
247            StateStoreImpl::MemoryStateStore($store) => {
248                // WARNING: don't change this. Enabling memory backend will cause monomorphization
249                // explosion and thus slow compile time in release mode.
250                #[cfg(debug_assertions)]
251                {
252                    $body
253                }
254                #[cfg(not(debug_assertions))]
255                {
256                    let _store = $store;
257                    unimplemented!("memory state store should never be used in release mode");
258                }
259            }
260
261            StateStoreImpl::SledStateStore($store) => {
262                // WARNING: don't change this. Enabling memory backend will cause monomorphization
263                // explosion and thus slow compile time in release mode.
264                #[cfg(debug_assertions)]
265                {
266                    $body
267                }
268                #[cfg(not(debug_assertions))]
269                {
270                    let _store = $store;
271                    unimplemented!("sled state store should never be used in release mode");
272                }
273            }
274
275            StateStoreImpl::HummockStateStore($store) => $body,
276        }
277    }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282    use std::fmt::Debug;
283    use std::future::Future;
284    use std::marker::PhantomData;
285    use std::ops::Deref;
286    use std::sync::Arc;
287
288    use bytes::Bytes;
289    use risingwave_common::array::VectorRef;
290    use risingwave_common::bitmap::Bitmap;
291    use risingwave_common::hash::VirtualNode;
292    use risingwave_hummock_sdk::HummockReadEpoch;
293    use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
294    use tracing::log::warn;
295
296    use crate::error::StorageResult;
297    use crate::hummock::HummockStorage;
298    use crate::store::*;
299    use crate::store_impl::AsHummock;
300
301    #[expect(dead_code)]
302    fn assert_result_eq<Item: PartialEq + Debug, E>(
303        first: &std::result::Result<Item, E>,
304        second: &std::result::Result<Item, E>,
305    ) {
306        match (first, second) {
307            (Ok(first), Ok(second)) => {
308                if first != second {
309                    warn!("result different: {:?} {:?}", first, second);
310                }
311                assert_eq!(first, second);
312            }
313            (Err(_), Err(_)) => {}
314            _ => {
315                warn!("one success and one failed");
316                panic!("result not equal");
317            }
318        }
319    }
320
321    #[derive(Clone)]
322    pub struct VerifyStateStore<A, E, T = ()> {
323        pub actual: A,
324        pub expected: Option<E>,
325        pub _phantom: PhantomData<T>,
326    }
327
328    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
329        fn as_hummock(&self) -> Option<&HummockStorage> {
330            self.actual.as_hummock()
331        }
332    }
333
334    impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
335        async fn on_key_value<'a, O: Send + 'a>(
336            &'a self,
337            key: TableKey<Bytes>,
338            read_options: ReadOptions,
339            on_key_value_fn: impl KeyValueFn<'a, O>,
340        ) -> StorageResult<Option<O>> {
341            let actual: Option<(FullKey<Bytes>, Bytes)> = self
342                .actual
343                .on_key_value(key.clone(), read_options.clone(), |key, value| {
344                    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345                })
346                .await?;
347            if let Some(expected) = &self.expected {
348                let expected: Option<(FullKey<Bytes>, Bytes)> = expected
349                    .on_key_value(key, read_options, |key, value| {
350                        Ok((key.copy_into(), Bytes::copy_from_slice(value)))
351                    })
352                    .await?;
353                assert_eq!(
354                    actual
355                        .as_ref()
356                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
357                    expected
358                        .as_ref()
359                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
360                );
361            }
362
363            actual
364                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
365                .transpose()
366        }
367    }
368
369    impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
370        for VerifyStateStore<A, E>
371    {
372        fn nearest<'a, O: Send + 'a>(
373            &'a self,
374            vec: VectorRef<'a>,
375            options: VectorNearestOptions,
376            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
377        ) -> impl StorageFuture<'a, Vec<O>> {
378            self.actual.nearest(vec, options, on_nearest_item_fn)
379        }
380    }
381
382    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
383        type Iter = impl StateStoreReadIter;
384        type RevIter = impl StateStoreReadIter;
385
386        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
387        // fail to compile.
388        #[expect(clippy::manual_async_fn)]
389        fn iter(
390            &self,
391            key_range: TableKeyRange,
392
393            read_options: ReadOptions,
394        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
395            async move {
396                let actual = self
397                    .actual
398                    .iter(key_range.clone(), read_options.clone())
399                    .await?;
400                let expected = if let Some(expected) = &self.expected {
401                    Some(expected.iter(key_range, read_options).await?)
402                } else {
403                    None
404                };
405
406                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
407            }
408        }
409
410        #[expect(clippy::manual_async_fn)]
411        fn rev_iter(
412            &self,
413            key_range: TableKeyRange,
414
415            read_options: ReadOptions,
416        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
417            async move {
418                let actual = self
419                    .actual
420                    .rev_iter(key_range.clone(), read_options.clone())
421                    .await?;
422                let expected = if let Some(expected) = &self.expected {
423                    Some(expected.rev_iter(key_range, read_options).await?)
424                } else {
425                    None
426                };
427
428                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
429            }
430        }
431    }
432
433    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
434        type ChangeLogIter = impl StateStoreReadChangeLogIter;
435
436        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
437            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
438            if let Some(expected) = &self.expected {
439                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
440            }
441            Ok(actual)
442        }
443
444        async fn iter_log(
445            &self,
446            epoch_range: (u64, u64),
447            key_range: TableKeyRange,
448            options: ReadLogOptions,
449        ) -> StorageResult<Self::ChangeLogIter> {
450            let actual = self
451                .actual
452                .iter_log(epoch_range, key_range.clone(), options.clone())
453                .await?;
454            let expected = if let Some(expected) = &self.expected {
455                Some(expected.iter_log(epoch_range, key_range, options).await?)
456            } else {
457                None
458            };
459
460            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
461        }
462    }
463
464    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
465        for VerifyStateStore<A, E, T>
466    where
467        for<'a> T::ItemRef<'a>: PartialEq + Debug,
468    {
469        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
470            let actual = self.actual.try_next().await?;
471            if let Some(expected) = self.expected.as_mut() {
472                let expected = expected.try_next().await?;
473                assert_eq!(actual, expected);
474            }
475            Ok(actual)
476        }
477    }
478
479    fn verify_iter<T: IterItem>(
480        actual: impl StateStoreIter<T>,
481        expected: Option<impl StateStoreIter<T>>,
482    ) -> impl StateStoreIter<T>
483    where
484        for<'a> T::ItemRef<'a>: PartialEq + Debug,
485    {
486        VerifyStateStore {
487            actual,
488            expected,
489            _phantom: PhantomData::<T>,
490        }
491    }
492
493    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
494        type FlushedSnapshotReader =
495            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
496
497        type Iter<'a> = impl StateStoreIter + 'a;
498        type RevIter<'a> = impl StateStoreIter + 'a;
499
500        #[expect(clippy::manual_async_fn)]
501        fn iter(
502            &self,
503            key_range: TableKeyRange,
504            read_options: ReadOptions,
505        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
506            async move {
507                let actual = self
508                    .actual
509                    .iter(key_range.clone(), read_options.clone())
510                    .await?;
511                let expected = if let Some(expected) = &self.expected {
512                    Some(expected.iter(key_range, read_options).await?)
513                } else {
514                    None
515                };
516
517                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
518            }
519        }
520
521        #[expect(clippy::manual_async_fn)]
522        fn rev_iter(
523            &self,
524            key_range: TableKeyRange,
525            read_options: ReadOptions,
526        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
527            async move {
528                let actual = self
529                    .actual
530                    .rev_iter(key_range.clone(), read_options.clone())
531                    .await?;
532                let expected = if let Some(expected) = &self.expected {
533                    Some(expected.rev_iter(key_range, read_options).await?)
534                } else {
535                    None
536                };
537
538                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
539            }
540        }
541
542        fn insert(
543            &mut self,
544            key: TableKey<Bytes>,
545            new_val: Bytes,
546            old_val: Option<Bytes>,
547        ) -> StorageResult<()> {
548            if let Some(expected) = &mut self.expected {
549                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
550            }
551            self.actual.insert(key, new_val, old_val)?;
552
553            Ok(())
554        }
555
556        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
557            if let Some(expected) = &mut self.expected {
558                expected.delete(key.clone(), old_val.clone())?;
559            }
560            self.actual.delete(key, old_val)?;
561            Ok(())
562        }
563
564        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
565            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
566            if let Some(expected) = &mut self.expected {
567                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
568            }
569            Ok(ret)
570        }
571
572        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
573            let ret = self.actual.get_table_watermark(vnode);
574            if let Some(expected) = &self.expected {
575                assert_eq!(ret, expected.get_table_watermark(vnode));
576            }
577            ret
578        }
579
580        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
581            VerifyStateStore {
582                actual: self.actual.new_flushed_snapshot_reader(),
583                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
584                _phantom: Default::default(),
585            }
586        }
587    }
588
589    impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
590        for VerifyStateStore<A, E>
591    {
592        async fn flush(&mut self) -> StorageResult<usize> {
593            if let Some(expected) = &mut self.expected {
594                expected.flush().await?;
595            }
596            self.actual.flush().await
597        }
598
599        async fn try_flush(&mut self) -> StorageResult<()> {
600            if let Some(expected) = &mut self.expected {
601                expected.try_flush().await?;
602            }
603            self.actual.try_flush().await
604        }
605
606        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
607            self.actual.init(options.clone()).await?;
608            if let Some(expected) = &mut self.expected {
609                expected.init(options).await?;
610            }
611            Ok(())
612        }
613
614        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
615            if let Some(expected) = &mut self.expected {
616                expected.seal_current_epoch(next_epoch, opts.clone());
617            }
618            self.actual.seal_current_epoch(next_epoch, opts);
619        }
620    }
621
622    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
623        type Local = VerifyStateStore<A::Local, E::Local>;
624        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
625        type VectorWriter = A::VectorWriter;
626
627        fn try_wait_epoch(
628            &self,
629            epoch: HummockReadEpoch,
630            options: TryWaitEpochOptions,
631        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
632            self.actual.try_wait_epoch(epoch, options)
633        }
634
635        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
636            let expected = if let Some(expected) = &self.expected {
637                Some(expected.new_local(option.clone()).await)
638            } else {
639                None
640            };
641            VerifyStateStore {
642                actual: self.actual.new_local(option).await,
643                expected,
644                _phantom: PhantomData::<()>,
645            }
646        }
647
648        async fn new_read_snapshot(
649            &self,
650            epoch: HummockReadEpoch,
651            options: NewReadSnapshotOptions,
652        ) -> StorageResult<Self::ReadSnapshot> {
653            let expected = if let Some(expected) = &self.expected {
654                Some(expected.new_read_snapshot(epoch, options).await?)
655            } else {
656                None
657            };
658            Ok(VerifyStateStore {
659                actual: self.actual.new_read_snapshot(epoch, options).await?,
660                expected,
661                _phantom: PhantomData::<()>,
662            })
663        }
664
665        fn new_vector_writer(
666            &self,
667            options: NewVectorWriterOptions,
668        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
669            self.actual.new_vector_writer(options)
670        }
671    }
672
673    impl<A, E> Deref for VerifyStateStore<A, E> {
674        type Target = A;
675
676        fn deref(&self) -> &Self::Target {
677            &self.actual
678        }
679    }
680}
681
682impl StateStoreImpl {
683    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
684    #[allow(clippy::too_many_arguments)]
685    #[expect(clippy::borrowed_box)]
686    pub async fn new(
687        s: &str,
688        opts: Arc<StorageOpts>,
689        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
690        state_store_metrics: Arc<HummockStateStoreMetrics>,
691        object_store_metrics: Arc<ObjectStoreMetrics>,
692        storage_metrics: Arc<MonitoredStorageMetrics>,
693        compactor_metrics: Arc<CompactorMetrics>,
694        await_tree_config: Option<await_tree::Config>,
695        use_new_object_prefix_strategy: bool,
696    ) -> StorageResult<Self> {
697        const KB: usize = 1 << 10;
698        const MB: usize = 1 << 20;
699
700        let meta_cache = {
701            let mut builder = HybridCacheBuilder::new()
702                .with_name("foyer.meta")
703                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
704                .memory(opts.meta_cache_capacity_mb * MB)
705                .with_shards(opts.meta_cache_shard_num)
706                .with_eviction_config(opts.meta_cache_eviction_config.clone())
707                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
708                    u64::BITS as usize / 8 + value.estimate_size()
709                })
710                .storage();
711
712            if !opts.meta_file_cache_dir.is_empty() {
713                if let Err(e) = Feature::ElasticDiskCache.check_available() {
714                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
715                } else {
716                    let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
717                        .with_capacity(opts.meta_file_cache_capacity_mb * MB)
718                        .with_throttle(opts.meta_file_cache_throttle.clone())
719                        .build()
720                        .map_err(HummockError::foyer_io_error)?;
721                    let engine_builder = BlockEngineBuilder::new(device)
722                        .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
723                        .with_indexer_shards(opts.meta_file_cache_indexer_shards)
724                        .with_flushers(opts.meta_file_cache_flushers)
725                        .with_reclaimers(opts.meta_file_cache_reclaimers)
726                        .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
727                        .with_clean_block_threshold(
728                            opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
729                        )
730                        .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
731                        .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
732                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
733                            opts.meta_file_cache_fifo_probation_ratio,
734                        ))]);
735                    builder = builder.with_engine_config(engine_builder);
736                }
737            }
738
739            builder.build().await.map_err(HummockError::foyer_error)?
740        };
741
742        let block_cache = {
743            let mut builder = HybridCacheBuilder::new()
744                .with_name("foyer.data")
745                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
746                .with_event_listener(Arc::new(BlockCacheEventListener::new(
747                    state_store_metrics.clone(),
748                )))
749                .memory(opts.block_cache_capacity_mb * MB)
750                .with_shards(opts.block_cache_shard_num)
751                .with_eviction_config(opts.block_cache_eviction_config.clone())
752                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
753                    // FIXME(MrCroxx): Calculate block weight more accurately.
754                    u64::BITS as usize * 2 / 8 + value.raw().len()
755                })
756                .storage();
757
758            if !opts.data_file_cache_dir.is_empty() {
759                if let Err(e) = Feature::ElasticDiskCache.check_available() {
760                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
761                } else {
762                    let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
763                        .with_capacity(opts.data_file_cache_capacity_mb * MB)
764                        .with_throttle(opts.data_file_cache_throttle.clone())
765                        .build()
766                        .map_err(HummockError::foyer_io_error)?;
767                    let engine_builder = BlockEngineBuilder::new(device)
768                        .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
769                        .with_indexer_shards(opts.data_file_cache_indexer_shards)
770                        .with_flushers(opts.data_file_cache_flushers)
771                        .with_reclaimers(opts.data_file_cache_reclaimers)
772                        .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
773                        .with_clean_block_threshold(
774                            opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
775                        )
776                        .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
777                        .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
778                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
779                            opts.data_file_cache_fifo_probation_ratio,
780                        ))]);
781                    builder = builder.with_engine_config(engine_builder);
782                }
783            }
784
785            builder.build().await.map_err(HummockError::foyer_error)?
786        };
787
788        let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
789            .with_shards(opts.vector_meta_cache_shard_num)
790            .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
791            .build();
792
793        let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
794            .with_shards(opts.vector_block_cache_shard_num)
795            .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
796            .build();
797
798        let recent_filter = if opts.data_file_cache_dir.is_empty() {
799            Arc::new(NoneRecentFilter::default().into())
800        } else if opts.cache_refill_recent_filter_shards == 1 {
801            Arc::new(
802                SimpleRecentFilter::new(
803                    opts.cache_refill_recent_filter_layers,
804                    Duration::from_millis(
805                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
806                    ),
807                )
808                .into(),
809            )
810        } else if opts.cache_refill_skip_recent_filter {
811            Arc::new(AllRecentFilter::default().into())
812        } else {
813            Arc::new(
814                ShardedRecentFilter::new(
815                    opts.cache_refill_recent_filter_layers,
816                    Duration::from_millis(
817                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
818                    ),
819                    opts.cache_refill_recent_filter_shards,
820                )
821                .into(),
822            )
823        };
824
825        let store = match s {
826            hummock if hummock.starts_with("hummock+") => {
827                let object_store = build_remote_object_store(
828                    hummock.strip_prefix("hummock+").unwrap(),
829                    object_store_metrics.clone(),
830                    "Hummock",
831                    Arc::new(opts.object_store_config.clone()),
832                )
833                .await;
834
835                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
836                    store: Arc::new(object_store),
837                    path: opts.data_directory.clone(),
838                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
839                    max_prefetch_block_number: opts.max_prefetch_block_number,
840                    recent_filter,
841                    state_store_metrics: state_store_metrics.clone(),
842                    use_new_object_prefix_strategy,
843
844                    meta_cache,
845                    block_cache,
846                    vector_meta_cache,
847                    vector_block_cache,
848                }));
849                let notification_client =
850                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
851                let compaction_catalog_manager_ref =
852                    Arc::new(CompactionCatalogManager::new(Box::new(
853                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
854                    )));
855
856                let inner = HummockStorage::new(
857                    opts.clone(),
858                    sstable_store,
859                    hummock_meta_client.clone(),
860                    notification_client,
861                    compaction_catalog_manager_ref,
862                    state_store_metrics.clone(),
863                    compactor_metrics.clone(),
864                    await_tree_config,
865                )
866                .await?;
867
868                StateStoreImpl::hummock(inner, storage_metrics)
869            }
870
871            "in_memory" | "in-memory" => {
872                tracing::warn!(
873                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
874                );
875                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
876            }
877
878            sled if sled.starts_with("sled://") => {
879                tracing::warn!(
880                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
881                );
882                let path = sled.strip_prefix("sled://").unwrap();
883                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
884            }
885
886            other => unimplemented!("{} state store is not supported", other),
887        };
888
889        Ok(store)
890    }
891}
892
893pub trait AsHummock: Send + Sync {
894    fn as_hummock(&self) -> Option<&HummockStorage>;
895
896    fn sync(
897        &self,
898        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
899    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
900        async move {
901            if let Some(hummock) = self.as_hummock() {
902                hummock.sync(sync_table_epochs).await
903            } else {
904                Ok(SyncResult::default())
905            }
906        }
907        .boxed()
908    }
909}
910
911impl AsHummock for HummockStorage {
912    fn as_hummock(&self) -> Option<&HummockStorage> {
913        Some(self)
914    }
915}
916
917impl AsHummock for MemoryStateStore {
918    fn as_hummock(&self) -> Option<&HummockStorage> {
919        None
920    }
921}
922
923impl AsHummock for SledStateStore {
924    fn as_hummock(&self) -> Option<&HummockStorage> {
925        None
926    }
927}
928
929#[cfg(debug_assertions)]
930mod dyn_state_store {
931    use std::future::Future;
932    use std::ops::DerefMut;
933    use std::sync::Arc;
934
935    use bytes::Bytes;
936    use risingwave_common::array::VectorRef;
937    use risingwave_common::bitmap::Bitmap;
938    use risingwave_common::hash::VirtualNode;
939    use risingwave_hummock_sdk::HummockReadEpoch;
940    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
941
942    use crate::error::StorageResult;
943    use crate::hummock::HummockStorage;
944    use crate::store::*;
945    use crate::store_impl::AsHummock;
946    use crate::vector::VectorDistance;
947
948    #[async_trait::async_trait]
949    pub trait DynStateStoreIter<T: IterItem>: Send {
950        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
951    }
952
953    #[async_trait::async_trait]
954    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
955        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
956            self.try_next().await
957        }
958    }
959
960    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
961    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
962        fn try_next(
963            &mut self,
964        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
965            self.deref_mut().try_next()
966        }
967    }
968
969    // For StateStoreRead
970
971    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
972    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
973
974    #[async_trait::async_trait]
975    pub trait DynStateStoreGet: StaticSendSync {
976        async fn get_keyed_row(
977            &self,
978            key: TableKey<Bytes>,
979            read_options: ReadOptions,
980        ) -> StorageResult<Option<StateStoreKeyedRow>>;
981    }
982
983    #[async_trait::async_trait]
984    pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
985        async fn iter(
986            &self,
987            key_range: TableKeyRange,
988
989            read_options: ReadOptions,
990        ) -> StorageResult<BoxStateStoreReadIter>;
991
992        async fn rev_iter(
993            &self,
994            key_range: TableKeyRange,
995
996            read_options: ReadOptions,
997        ) -> StorageResult<BoxStateStoreReadIter>;
998    }
999
1000    #[async_trait::async_trait]
1001    pub trait DynStateStoreReadLog: StaticSendSync {
1002        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1003        async fn iter_log(
1004            &self,
1005            epoch_range: (u64, u64),
1006            key_range: TableKeyRange,
1007            options: ReadLogOptions,
1008        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1009    }
1010
1011    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1012
1013    #[async_trait::async_trait]
1014    impl<S: StateStoreGet> DynStateStoreGet for S {
1015        async fn get_keyed_row(
1016            &self,
1017            key: TableKey<Bytes>,
1018            read_options: ReadOptions,
1019        ) -> StorageResult<Option<StateStoreKeyedRow>> {
1020            self.on_key_value(key, read_options, move |key, value| {
1021                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1022            })
1023            .await
1024        }
1025    }
1026
1027    #[async_trait::async_trait]
1028    impl<S: StateStoreRead> DynStateStoreRead for S {
1029        async fn iter(
1030            &self,
1031            key_range: TableKeyRange,
1032
1033            read_options: ReadOptions,
1034        ) -> StorageResult<BoxStateStoreReadIter> {
1035            Ok(Box::new(self.iter(key_range, read_options).await?))
1036        }
1037
1038        async fn rev_iter(
1039            &self,
1040            key_range: TableKeyRange,
1041
1042            read_options: ReadOptions,
1043        ) -> StorageResult<BoxStateStoreReadIter> {
1044            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1045        }
1046    }
1047
1048    #[async_trait::async_trait]
1049    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1050        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1051            self.next_epoch(epoch, options).await
1052        }
1053
1054        async fn iter_log(
1055            &self,
1056            epoch_range: (u64, u64),
1057            key_range: TableKeyRange,
1058            options: ReadLogOptions,
1059        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1060            Ok(Box::new(
1061                self.iter_log(epoch_range, key_range, options).await?,
1062            ))
1063        }
1064    }
1065
1066    // For LocalStateStore
1067    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1068    #[async_trait::async_trait]
1069    pub trait DynLocalStateStore:
1070        DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1071    {
1072        async fn iter(
1073            &self,
1074            key_range: TableKeyRange,
1075            read_options: ReadOptions,
1076        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1077
1078        async fn rev_iter(
1079            &self,
1080            key_range: TableKeyRange,
1081            read_options: ReadOptions,
1082        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1083
1084        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1085
1086        fn insert(
1087            &mut self,
1088            key: TableKey<Bytes>,
1089            new_val: Bytes,
1090            old_val: Option<Bytes>,
1091        ) -> StorageResult<()>;
1092
1093        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1094
1095        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1096
1097        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1098    }
1099
1100    #[async_trait::async_trait]
1101    pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1102        async fn flush(&mut self) -> StorageResult<usize>;
1103
1104        async fn try_flush(&mut self) -> StorageResult<()>;
1105
1106        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1107
1108        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1109    }
1110
1111    #[async_trait::async_trait]
1112    impl<S: LocalStateStore> DynLocalStateStore for S {
1113        async fn iter(
1114            &self,
1115            key_range: TableKeyRange,
1116            read_options: ReadOptions,
1117        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1118            Ok(Box::new(self.iter(key_range, read_options).await?))
1119        }
1120
1121        async fn rev_iter(
1122            &self,
1123            key_range: TableKeyRange,
1124            read_options: ReadOptions,
1125        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1126            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1127        }
1128
1129        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1130            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1131        }
1132
1133        fn insert(
1134            &mut self,
1135            key: TableKey<Bytes>,
1136            new_val: Bytes,
1137            old_val: Option<Bytes>,
1138        ) -> StorageResult<()> {
1139            self.insert(key, new_val, old_val)
1140        }
1141
1142        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1143            self.delete(key, old_val)
1144        }
1145
1146        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1147            self.update_vnode_bitmap(vnodes).await
1148        }
1149
1150        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1151            self.get_table_watermark(vnode)
1152        }
1153    }
1154
1155    #[async_trait::async_trait]
1156    impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1157        async fn flush(&mut self) -> StorageResult<usize> {
1158            self.flush().await
1159        }
1160
1161        async fn try_flush(&mut self) -> StorageResult<()> {
1162            self.try_flush().await
1163        }
1164
1165        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1166            self.init(options).await
1167        }
1168
1169        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1170            self.seal_current_epoch(next_epoch, opts)
1171        }
1172    }
1173
1174    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1175
1176    impl LocalStateStore for BoxDynLocalStateStore {
1177        type FlushedSnapshotReader = StateStoreReadDynRef;
1178        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1179        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1180
1181        fn iter(
1182            &self,
1183            key_range: TableKeyRange,
1184            read_options: ReadOptions,
1185        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1186            (*self.0).iter(key_range, read_options)
1187        }
1188
1189        fn rev_iter(
1190            &self,
1191            key_range: TableKeyRange,
1192            read_options: ReadOptions,
1193        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1194            (*self.0).rev_iter(key_range, read_options)
1195        }
1196
1197        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1198            (*self.0).new_flushed_snapshot_reader()
1199        }
1200
1201        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1202            (*self.0).get_table_watermark(vnode)
1203        }
1204
1205        fn insert(
1206            &mut self,
1207            key: TableKey<Bytes>,
1208            new_val: Bytes,
1209            old_val: Option<Bytes>,
1210        ) -> StorageResult<()> {
1211            (*self.0).insert(key, new_val, old_val)
1212        }
1213
1214        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1215            (*self.0).delete(key, old_val)
1216        }
1217
1218        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1219            (*self.0).update_vnode_bitmap(vnodes).await
1220        }
1221    }
1222
1223    impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1224    where
1225        StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1226    {
1227        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1228            self.as_mut().flush()
1229        }
1230
1231        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1232            self.as_mut().try_flush()
1233        }
1234
1235        fn init(
1236            &mut self,
1237            options: InitOptions,
1238        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1239            self.as_mut().init(options)
1240        }
1241
1242        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1243            self.as_mut().seal_current_epoch(next_epoch, opts)
1244        }
1245    }
1246
1247    #[async_trait::async_trait]
1248    pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1249        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
1250    }
1251
1252    #[async_trait::async_trait]
1253    impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1254        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1255            self.insert(vec, info)
1256        }
1257    }
1258
1259    pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1260
1261    impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1262        fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1263            self.0.insert(vec, info)
1264        }
1265    }
1266
1267    // For global StateStore
1268
1269    #[async_trait::async_trait]
1270    pub trait DynStateStoreReadVector: StaticSendSync {
1271        async fn nearest(
1272            &self,
1273            vec: VectorRef<'_>,
1274            options: VectorNearestOptions,
1275        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1276    }
1277
1278    #[async_trait::async_trait]
1279    impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1280        async fn nearest(
1281            &self,
1282            vec: VectorRef<'_>,
1283            options: VectorNearestOptions,
1284        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1285            use risingwave_common::types::ScalarRef;
1286            self.nearest(vec, options, |vec, distance, info| {
1287                (
1288                    vec.to_owned_scalar(),
1289                    distance,
1290                    Bytes::copy_from_slice(info),
1291                )
1292            })
1293            .await
1294        }
1295    }
1296
1297    impl<P> StateStoreReadVector for StateStorePointer<P>
1298    where
1299        StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1300    {
1301        async fn nearest<'a, O: Send + 'a>(
1302            &'a self,
1303            vec: VectorRef<'a>,
1304            options: VectorNearestOptions,
1305            on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1306        ) -> StorageResult<Vec<O>> {
1307            let output = self.as_ref().nearest(vec, options).await?;
1308            Ok(output
1309                .into_iter()
1310                .map(|(vec, distance, info)| {
1311                    on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1312                })
1313                .collect())
1314        }
1315    }
1316
1317    pub trait DynStateStoreReadSnapshot:
1318        DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1319    {
1320    }
1321
1322    impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1323        for S
1324    {
1325    }
1326
1327    pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1328    #[async_trait::async_trait]
1329    pub trait DynStateStoreExt: StaticSendSync {
1330        async fn try_wait_epoch(
1331            &self,
1332            epoch: HummockReadEpoch,
1333            options: TryWaitEpochOptions,
1334        ) -> StorageResult<()>;
1335
1336        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1337        async fn new_read_snapshot(
1338            &self,
1339            epoch: HummockReadEpoch,
1340            options: NewReadSnapshotOptions,
1341        ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1342        async fn new_vector_writer(
1343            &self,
1344            options: NewVectorWriterOptions,
1345        ) -> BoxDynStateStoreWriteVector;
1346    }
1347
1348    #[async_trait::async_trait]
1349    impl<S: StateStore> DynStateStoreExt for S {
1350        async fn try_wait_epoch(
1351            &self,
1352            epoch: HummockReadEpoch,
1353            options: TryWaitEpochOptions,
1354        ) -> StorageResult<()> {
1355            self.try_wait_epoch(epoch, options).await
1356        }
1357
1358        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1359            StateStorePointer(Box::new(self.new_local(option).await))
1360        }
1361
1362        async fn new_read_snapshot(
1363            &self,
1364            epoch: HummockReadEpoch,
1365            options: NewReadSnapshotOptions,
1366        ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1367            Ok(StateStorePointer(Arc::new(
1368                self.new_read_snapshot(epoch, options).await?,
1369            )))
1370        }
1371
1372        async fn new_vector_writer(
1373            &self,
1374            options: NewVectorWriterOptions,
1375        ) -> BoxDynStateStoreWriteVector {
1376            StateStorePointer(Box::new(self.new_vector_writer(options).await))
1377        }
1378    }
1379
1380    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1381
1382    macro_rules! state_store_pointer_dyn_as_ref {
1383        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1384            impl AsRef<dyn $target_dyn_trait>
1385                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1386            {
1387                fn as_ref(&self) -> &dyn $target_dyn_trait {
1388                    (&*self.0) as _
1389                }
1390            }
1391        };
1392    }
1393
1394    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1395    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1396    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1397    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1398    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1399    state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1400
1401    macro_rules! state_store_pointer_dyn_as_mut {
1402        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1403            impl AsMut<dyn $target_dyn_trait>
1404                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1405            {
1406                fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1407                    (&mut *self.0) as _
1408                }
1409            }
1410        };
1411    }
1412
1413    state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1414    state_store_pointer_dyn_as_mut!(
1415        Box<dyn DynStateStoreWriteVector>,
1416        DynStateStoreWriteEpochControl
1417    );
1418
1419    #[derive(Clone)]
1420    pub struct StateStorePointer<P>(pub(crate) P);
1421
1422    impl<P> StateStoreGet for StateStorePointer<P>
1423    where
1424        StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1425    {
1426        async fn on_key_value<'a, O: Send + 'a>(
1427            &'a self,
1428            key: TableKey<Bytes>,
1429            read_options: ReadOptions,
1430            on_key_value_fn: impl KeyValueFn<'a, O>,
1431        ) -> StorageResult<Option<O>> {
1432            let option = self.as_ref().get_keyed_row(key, read_options).await?;
1433            option
1434                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1435                .transpose()
1436        }
1437    }
1438
1439    impl<P> StateStoreRead for StateStorePointer<P>
1440    where
1441        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1442    {
1443        type Iter = BoxStateStoreReadIter;
1444        type RevIter = BoxStateStoreReadIter;
1445
1446        fn iter(
1447            &self,
1448            key_range: TableKeyRange,
1449
1450            read_options: ReadOptions,
1451        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1452            self.as_ref().iter(key_range, read_options)
1453        }
1454
1455        fn rev_iter(
1456            &self,
1457            key_range: TableKeyRange,
1458
1459            read_options: ReadOptions,
1460        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1461            self.as_ref().rev_iter(key_range, read_options)
1462        }
1463    }
1464
1465    impl StateStoreReadLog for StateStoreDynRef {
1466        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1467
1468        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1469            (*self.0).next_epoch(epoch, options).await
1470        }
1471
1472        fn iter_log(
1473            &self,
1474            epoch_range: (u64, u64),
1475            key_range: TableKeyRange,
1476            options: ReadLogOptions,
1477        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1478            (*self.0).iter_log(epoch_range, key_range, options)
1479        }
1480    }
1481
1482    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1483
1484    impl AsHummock for StateStoreDynRef {
1485        fn as_hummock(&self) -> Option<&HummockStorage> {
1486            (*self.0).as_hummock()
1487        }
1488    }
1489
1490    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1491
1492    impl StateStore for StateStoreDynRef {
1493        type Local = BoxDynLocalStateStore;
1494        type ReadSnapshot = StateStoreReadSnapshotDynRef;
1495        type VectorWriter = BoxDynStateStoreWriteVector;
1496
1497        fn try_wait_epoch(
1498            &self,
1499            epoch: HummockReadEpoch,
1500            options: TryWaitEpochOptions,
1501        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1502            (*self.0).try_wait_epoch(epoch, options)
1503        }
1504
1505        fn new_local(
1506            &self,
1507            option: NewLocalOptions,
1508        ) -> impl Future<Output = Self::Local> + Send + '_ {
1509            (*self.0).new_local(option)
1510        }
1511
1512        async fn new_read_snapshot(
1513            &self,
1514            epoch: HummockReadEpoch,
1515            options: NewReadSnapshotOptions,
1516        ) -> StorageResult<Self::ReadSnapshot> {
1517            (*self.0).new_read_snapshot(epoch, options).await
1518        }
1519
1520        fn new_vector_writer(
1521            &self,
1522            options: NewVectorWriterOptions,
1523        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1524            (*self.0).new_vector_writer(options)
1525        }
1526    }
1527}