risingwave_storage/
store_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22    BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23    HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45    Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46    SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52    ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57    Box::new(PrometheusMetricsRegistry::new(
58        GLOBAL_METRICS_REGISTRY.clone(),
59    ))
60});
61
62mod opaque_type {
63    use super::*;
64
65    pub type HummockStorageType = impl StateStore + AsHummock;
66    pub type MemoryStateStoreType = impl StateStore + AsHummock;
67    pub type SledStateStoreType = impl StateStore + AsHummock;
68
69    #[define_opaque(MemoryStateStoreType)]
70    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71        may_dynamic_dispatch(state_store)
72    }
73
74    #[define_opaque(HummockStorageType)]
75    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76        may_dynamic_dispatch(may_verify(state_store))
77    }
78
79    #[define_opaque(SledStateStoreType)]
80    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81        may_dynamic_dispatch(state_store)
82    }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94    state_store: S,
95    storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97    let inner = {
98        #[cfg(feature = "hm-trace")]
99        {
100            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101        }
102        #[cfg(not(feature = "hm-trace"))]
103        {
104            state_store
105        }
106    };
107    inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111    let inner = state_store.inner();
112    {
113        #[cfg(feature = "hm-trace")]
114        {
115            inner.inner()
116        }
117        #[cfg(not(feature = "hm-trace"))]
118        {
119            inner
120        }
121    }
122}
123
124/// The type erased [`StateStore`].
125#[derive(Clone, EnumAsInner)]
126#[allow(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
129    /// `hummock` will be automatically recognized as Hummock state store.
130    ///
131    /// Example URLs:
132    ///
133    /// * `hummock+s3://bucket`
134    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
135    /// * `hummock+memory` (should only be used in 1 compute node mode)
136    HummockStateStore(Monitored<HummockStorageType>),
137    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
138    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
139    /// store misses some critical implementation to ensure the correctness of persisting streaming
140    /// state. (e.g., no `read_epoch` support, no async checkpoint)
141    MemoryStateStore(Monitored<MemoryStateStoreType>),
142    SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146    #[cfg(not(debug_assertions))]
147    {
148        state_store
149    }
150    #[cfg(debug_assertions)]
151    {
152        use crate::store_impl::dyn_state_store::StateStorePointer;
153        StateStorePointer(Arc::new(state_store) as _)
154    }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158    #[cfg(not(debug_assertions))]
159    {
160        state_store
161    }
162    #[cfg(debug_assertions)]
163    {
164        use std::marker::PhantomData;
165
166        use risingwave_common::util::env_var::env_var_is_true;
167        use tracing::info;
168
169        use crate::store_impl::verify::VerifyStateStore;
170
171        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172            info!("enable verify state store");
173            Some(SledStateStore::new_temp())
174        } else {
175            info!("verify state store is not enabled");
176            None
177        };
178        VerifyStateStore {
179            actual: state_store,
180            expected,
181            _phantom: PhantomData::<()>,
182        }
183    }
184}
185
186impl StateStoreImpl {
187    fn in_memory(
188        state_store: MemoryStateStore,
189        storage_metrics: Arc<MonitoredStorageMetrics>,
190    ) -> Self {
191        // The specific type of MemoryStateStoreType in deducted here.
192        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193    }
194
195    pub fn hummock(
196        state_store: HummockStorage,
197        storage_metrics: Arc<MonitoredStorageMetrics>,
198    ) -> Self {
199        // The specific type of HummockStateStoreType in deducted here.
200        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201    }
202
203    pub fn sled(
204        state_store: SledStateStore,
205        storage_metrics: Arc<MonitoredStorageMetrics>,
206    ) -> Self {
207        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208    }
209
210    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212    }
213
214    pub fn for_test() -> Self {
215        Self::in_memory(
216            MemoryStateStore::new(),
217            Arc::new(MonitoredStorageMetrics::unused()),
218        )
219    }
220
221    pub fn as_hummock(&self) -> Option<&HummockStorage> {
222        match self {
223            StateStoreImpl::HummockStateStore(hummock) => {
224                Some(inner(hummock).as_hummock().expect("should be hummock"))
225            }
226            _ => None,
227        }
228    }
229}
230
231impl Debug for StateStoreImpl {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        match self {
234            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237        }
238    }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243    ($impl:expr, $store:ident, $body:tt) => {{
244        use $crate::store_impl::StateStoreImpl;
245
246        match $impl {
247            StateStoreImpl::MemoryStateStore($store) => {
248                // WARNING: don't change this. Enabling memory backend will cause monomorphization
249                // explosion and thus slow compile time in release mode.
250                #[cfg(debug_assertions)]
251                {
252                    $body
253                }
254                #[cfg(not(debug_assertions))]
255                {
256                    let _store = $store;
257                    unimplemented!("memory state store should never be used in release mode");
258                }
259            }
260
261            StateStoreImpl::SledStateStore($store) => {
262                // WARNING: don't change this. Enabling memory backend will cause monomorphization
263                // explosion and thus slow compile time in release mode.
264                #[cfg(debug_assertions)]
265                {
266                    $body
267                }
268                #[cfg(not(debug_assertions))]
269                {
270                    let _store = $store;
271                    unimplemented!("sled state store should never be used in release mode");
272                }
273            }
274
275            StateStoreImpl::HummockStateStore($store) => $body,
276        }
277    }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282    use std::fmt::Debug;
283    use std::future::Future;
284    use std::marker::PhantomData;
285    use std::ops::Deref;
286    use std::sync::Arc;
287
288    use bytes::Bytes;
289    use risingwave_common::bitmap::Bitmap;
290    use risingwave_common::hash::VirtualNode;
291    use risingwave_hummock_sdk::HummockReadEpoch;
292    use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
293    use tracing::log::warn;
294
295    use crate::error::StorageResult;
296    use crate::hummock::HummockStorage;
297    use crate::store::*;
298    use crate::store_impl::AsHummock;
299
300    #[expect(dead_code)]
301    fn assert_result_eq<Item: PartialEq + Debug, E>(
302        first: &std::result::Result<Item, E>,
303        second: &std::result::Result<Item, E>,
304    ) {
305        match (first, second) {
306            (Ok(first), Ok(second)) => {
307                if first != second {
308                    warn!("result different: {:?} {:?}", first, second);
309                }
310                assert_eq!(first, second);
311            }
312            (Err(_), Err(_)) => {}
313            _ => {
314                warn!("one success and one failed");
315                panic!("result not equal");
316            }
317        }
318    }
319
320    #[derive(Clone)]
321    pub struct VerifyStateStore<A, E, T = ()> {
322        pub actual: A,
323        pub expected: Option<E>,
324        pub _phantom: PhantomData<T>,
325    }
326
327    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
328        fn as_hummock(&self) -> Option<&HummockStorage> {
329            self.actual.as_hummock()
330        }
331    }
332
333    impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
334        async fn on_key_value<O: Send + 'static>(
335            &self,
336            key: TableKey<Bytes>,
337            read_options: ReadOptions,
338            on_key_value_fn: impl KeyValueFn<O>,
339        ) -> StorageResult<Option<O>> {
340            let actual: Option<(FullKey<Bytes>, Bytes)> = self
341                .actual
342                .on_key_value(key.clone(), read_options.clone(), |key, value| {
343                    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
344                })
345                .await?;
346            if let Some(expected) = &self.expected {
347                let expected: Option<(FullKey<Bytes>, Bytes)> = expected
348                    .on_key_value(key, read_options, |key, value| {
349                        Ok((key.copy_into(), Bytes::copy_from_slice(value)))
350                    })
351                    .await?;
352                assert_eq!(
353                    actual
354                        .as_ref()
355                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
356                    expected
357                        .as_ref()
358                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
359                );
360            }
361
362            actual
363                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
364                .transpose()
365        }
366    }
367
368    impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
369        for VerifyStateStore<A, E>
370    {
371        fn nearest<O: Send + 'static>(
372            &self,
373            vec: Vector,
374            options: VectorNearestOptions,
375            on_nearest_item_fn: impl OnNearestItemFn<O>,
376        ) -> impl StorageFuture<'_, Vec<O>> {
377            self.actual.nearest(vec, options, on_nearest_item_fn)
378        }
379    }
380
381    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
382        type Iter = impl StateStoreReadIter;
383        type RevIter = impl StateStoreReadIter;
384
385        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
386        // fail to compile.
387        #[expect(clippy::manual_async_fn)]
388        fn iter(
389            &self,
390            key_range: TableKeyRange,
391
392            read_options: ReadOptions,
393        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
394            async move {
395                let actual = self
396                    .actual
397                    .iter(key_range.clone(), read_options.clone())
398                    .await?;
399                let expected = if let Some(expected) = &self.expected {
400                    Some(expected.iter(key_range, read_options).await?)
401                } else {
402                    None
403                };
404
405                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
406            }
407        }
408
409        #[expect(clippy::manual_async_fn)]
410        fn rev_iter(
411            &self,
412            key_range: TableKeyRange,
413
414            read_options: ReadOptions,
415        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
416            async move {
417                let actual = self
418                    .actual
419                    .rev_iter(key_range.clone(), read_options.clone())
420                    .await?;
421                let expected = if let Some(expected) = &self.expected {
422                    Some(expected.rev_iter(key_range, read_options).await?)
423                } else {
424                    None
425                };
426
427                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
428            }
429        }
430    }
431
432    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
433        type ChangeLogIter = impl StateStoreReadChangeLogIter;
434
435        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
436            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
437            if let Some(expected) = &self.expected {
438                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
439            }
440            Ok(actual)
441        }
442
443        async fn iter_log(
444            &self,
445            epoch_range: (u64, u64),
446            key_range: TableKeyRange,
447            options: ReadLogOptions,
448        ) -> StorageResult<Self::ChangeLogIter> {
449            let actual = self
450                .actual
451                .iter_log(epoch_range, key_range.clone(), options.clone())
452                .await?;
453            let expected = if let Some(expected) = &self.expected {
454                Some(expected.iter_log(epoch_range, key_range, options).await?)
455            } else {
456                None
457            };
458
459            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
460        }
461    }
462
463    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
464        for VerifyStateStore<A, E, T>
465    where
466        for<'a> T::ItemRef<'a>: PartialEq + Debug,
467    {
468        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
469            let actual = self.actual.try_next().await?;
470            if let Some(expected) = self.expected.as_mut() {
471                let expected = expected.try_next().await?;
472                assert_eq!(actual, expected);
473            }
474            Ok(actual)
475        }
476    }
477
478    fn verify_iter<T: IterItem>(
479        actual: impl StateStoreIter<T>,
480        expected: Option<impl StateStoreIter<T>>,
481    ) -> impl StateStoreIter<T>
482    where
483        for<'a> T::ItemRef<'a>: PartialEq + Debug,
484    {
485        VerifyStateStore {
486            actual,
487            expected,
488            _phantom: PhantomData::<T>,
489        }
490    }
491
492    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
493        type FlushedSnapshotReader =
494            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
495
496        type Iter<'a> = impl StateStoreIter + 'a;
497        type RevIter<'a> = impl StateStoreIter + 'a;
498
499        #[expect(clippy::manual_async_fn)]
500        fn iter(
501            &self,
502            key_range: TableKeyRange,
503            read_options: ReadOptions,
504        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
505            async move {
506                let actual = self
507                    .actual
508                    .iter(key_range.clone(), read_options.clone())
509                    .await?;
510                let expected = if let Some(expected) = &self.expected {
511                    Some(expected.iter(key_range, read_options).await?)
512                } else {
513                    None
514                };
515
516                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
517            }
518        }
519
520        #[expect(clippy::manual_async_fn)]
521        fn rev_iter(
522            &self,
523            key_range: TableKeyRange,
524            read_options: ReadOptions,
525        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
526            async move {
527                let actual = self
528                    .actual
529                    .rev_iter(key_range.clone(), read_options.clone())
530                    .await?;
531                let expected = if let Some(expected) = &self.expected {
532                    Some(expected.rev_iter(key_range, read_options).await?)
533                } else {
534                    None
535                };
536
537                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
538            }
539        }
540
541        fn insert(
542            &mut self,
543            key: TableKey<Bytes>,
544            new_val: Bytes,
545            old_val: Option<Bytes>,
546        ) -> StorageResult<()> {
547            if let Some(expected) = &mut self.expected {
548                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
549            }
550            self.actual.insert(key, new_val, old_val)?;
551
552            Ok(())
553        }
554
555        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
556            if let Some(expected) = &mut self.expected {
557                expected.delete(key.clone(), old_val.clone())?;
558            }
559            self.actual.delete(key, old_val)?;
560            Ok(())
561        }
562
563        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
564            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
565            if let Some(expected) = &mut self.expected {
566                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
567            }
568            Ok(ret)
569        }
570
571        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
572            let ret = self.actual.get_table_watermark(vnode);
573            if let Some(expected) = &self.expected {
574                assert_eq!(ret, expected.get_table_watermark(vnode));
575            }
576            ret
577        }
578
579        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
580            VerifyStateStore {
581                actual: self.actual.new_flushed_snapshot_reader(),
582                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
583                _phantom: Default::default(),
584            }
585        }
586    }
587
588    impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
589        for VerifyStateStore<A, E>
590    {
591        async fn flush(&mut self) -> StorageResult<usize> {
592            if let Some(expected) = &mut self.expected {
593                expected.flush().await?;
594            }
595            self.actual.flush().await
596        }
597
598        async fn try_flush(&mut self) -> StorageResult<()> {
599            if let Some(expected) = &mut self.expected {
600                expected.try_flush().await?;
601            }
602            self.actual.try_flush().await
603        }
604
605        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
606            self.actual.init(options.clone()).await?;
607            if let Some(expected) = &mut self.expected {
608                expected.init(options).await?;
609            }
610            Ok(())
611        }
612
613        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
614            if let Some(expected) = &mut self.expected {
615                expected.seal_current_epoch(next_epoch, opts.clone());
616            }
617            self.actual.seal_current_epoch(next_epoch, opts);
618        }
619    }
620
621    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
622        type Local = VerifyStateStore<A::Local, E::Local>;
623        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
624        type VectorWriter = A::VectorWriter;
625
626        fn try_wait_epoch(
627            &self,
628            epoch: HummockReadEpoch,
629            options: TryWaitEpochOptions,
630        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
631            self.actual.try_wait_epoch(epoch, options)
632        }
633
634        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
635            let expected = if let Some(expected) = &self.expected {
636                Some(expected.new_local(option.clone()).await)
637            } else {
638                None
639            };
640            VerifyStateStore {
641                actual: self.actual.new_local(option).await,
642                expected,
643                _phantom: PhantomData::<()>,
644            }
645        }
646
647        async fn new_read_snapshot(
648            &self,
649            epoch: HummockReadEpoch,
650            options: NewReadSnapshotOptions,
651        ) -> StorageResult<Self::ReadSnapshot> {
652            let expected = if let Some(expected) = &self.expected {
653                Some(expected.new_read_snapshot(epoch, options).await?)
654            } else {
655                None
656            };
657            Ok(VerifyStateStore {
658                actual: self.actual.new_read_snapshot(epoch, options).await?,
659                expected,
660                _phantom: PhantomData::<()>,
661            })
662        }
663
664        fn new_vector_writer(
665            &self,
666            options: NewVectorWriterOptions,
667        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
668            self.actual.new_vector_writer(options)
669        }
670    }
671
672    impl<A, E> Deref for VerifyStateStore<A, E> {
673        type Target = A;
674
675        fn deref(&self) -> &Self::Target {
676            &self.actual
677        }
678    }
679}
680
681impl StateStoreImpl {
682    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
683    #[allow(clippy::too_many_arguments)]
684    #[expect(clippy::borrowed_box)]
685    pub async fn new(
686        s: &str,
687        opts: Arc<StorageOpts>,
688        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
689        state_store_metrics: Arc<HummockStateStoreMetrics>,
690        object_store_metrics: Arc<ObjectStoreMetrics>,
691        storage_metrics: Arc<MonitoredStorageMetrics>,
692        compactor_metrics: Arc<CompactorMetrics>,
693        await_tree_config: Option<await_tree::Config>,
694        use_new_object_prefix_strategy: bool,
695    ) -> StorageResult<Self> {
696        const KB: usize = 1 << 10;
697        const MB: usize = 1 << 20;
698
699        let meta_cache = {
700            let mut builder = HybridCacheBuilder::new()
701                .with_name("foyer.meta")
702                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
703                .memory(opts.meta_cache_capacity_mb * MB)
704                .with_shards(opts.meta_cache_shard_num)
705                .with_eviction_config(opts.meta_cache_eviction_config.clone())
706                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
707                    u64::BITS as usize / 8 + value.estimate_size()
708                })
709                .storage();
710
711            if !opts.meta_file_cache_dir.is_empty() {
712                if let Err(e) = Feature::ElasticDiskCache.check_available() {
713                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
714                } else {
715                    let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
716                        .with_capacity(opts.meta_file_cache_capacity_mb * MB)
717                        .with_throttle(opts.meta_file_cache_throttle.clone())
718                        .build()
719                        .map_err(HummockError::foyer_io_error)?;
720                    let engine_builder = BlockEngineBuilder::new(device)
721                        .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
722                        .with_indexer_shards(opts.meta_file_cache_indexer_shards)
723                        .with_flushers(opts.meta_file_cache_flushers)
724                        .with_reclaimers(opts.meta_file_cache_reclaimers)
725                        .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
726                        .with_clean_block_threshold(
727                            opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
728                        )
729                        .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
730                        .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
731                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
732                            opts.meta_file_cache_fifo_probation_ratio,
733                        ))]);
734                    builder = builder.with_engine_config(engine_builder);
735                }
736            }
737
738            builder.build().await.map_err(HummockError::foyer_error)?
739        };
740
741        let block_cache = {
742            let mut builder = HybridCacheBuilder::new()
743                .with_name("foyer.data")
744                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
745                .with_event_listener(Arc::new(BlockCacheEventListener::new(
746                    state_store_metrics.clone(),
747                )))
748                .memory(opts.block_cache_capacity_mb * MB)
749                .with_shards(opts.block_cache_shard_num)
750                .with_eviction_config(opts.block_cache_eviction_config.clone())
751                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
752                    // FIXME(MrCroxx): Calculate block weight more accurately.
753                    u64::BITS as usize * 2 / 8 + value.raw().len()
754                })
755                .storage();
756
757            if !opts.data_file_cache_dir.is_empty() {
758                if let Err(e) = Feature::ElasticDiskCache.check_available() {
759                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
760                } else {
761                    let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
762                        .with_capacity(opts.data_file_cache_capacity_mb * MB)
763                        .with_throttle(opts.data_file_cache_throttle.clone())
764                        .build()
765                        .map_err(HummockError::foyer_io_error)?;
766                    let engine_builder = BlockEngineBuilder::new(device)
767                        .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
768                        .with_indexer_shards(opts.data_file_cache_indexer_shards)
769                        .with_flushers(opts.data_file_cache_flushers)
770                        .with_reclaimers(opts.data_file_cache_reclaimers)
771                        .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
772                        .with_clean_block_threshold(
773                            opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
774                        )
775                        .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
776                        .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
777                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
778                            opts.data_file_cache_fifo_probation_ratio,
779                        ))]);
780                    builder = builder.with_engine_config(engine_builder);
781                }
782            }
783
784            builder.build().await.map_err(HummockError::foyer_error)?
785        };
786
787        let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
788            .with_shards(opts.vector_meta_cache_shard_num)
789            .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
790            .build();
791
792        let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
793            .with_shards(opts.vector_block_cache_shard_num)
794            .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
795            .build();
796
797        let recent_filter = if opts.data_file_cache_dir.is_empty() {
798            Arc::new(NoneRecentFilter::default().into())
799        } else if opts.cache_refill_recent_filter_shards == 1 {
800            Arc::new(
801                SimpleRecentFilter::new(
802                    opts.cache_refill_recent_filter_layers,
803                    Duration::from_millis(
804                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
805                    ),
806                )
807                .into(),
808            )
809        } else if opts.cache_refill_skip_recent_filter {
810            Arc::new(AllRecentFilter::default().into())
811        } else {
812            Arc::new(
813                ShardedRecentFilter::new(
814                    opts.cache_refill_recent_filter_layers,
815                    Duration::from_millis(
816                        opts.cache_refill_recent_filter_rotate_interval_ms as u64,
817                    ),
818                    opts.cache_refill_recent_filter_shards,
819                )
820                .into(),
821            )
822        };
823
824        let store = match s {
825            hummock if hummock.starts_with("hummock+") => {
826                let object_store = build_remote_object_store(
827                    hummock.strip_prefix("hummock+").unwrap(),
828                    object_store_metrics.clone(),
829                    "Hummock",
830                    Arc::new(opts.object_store_config.clone()),
831                )
832                .await;
833
834                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
835                    store: Arc::new(object_store),
836                    path: opts.data_directory.clone(),
837                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
838                    max_prefetch_block_number: opts.max_prefetch_block_number,
839                    recent_filter,
840                    state_store_metrics: state_store_metrics.clone(),
841                    use_new_object_prefix_strategy,
842
843                    meta_cache,
844                    block_cache,
845                    vector_meta_cache,
846                    vector_block_cache,
847                }));
848                let notification_client =
849                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
850                let compaction_catalog_manager_ref =
851                    Arc::new(CompactionCatalogManager::new(Box::new(
852                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
853                    )));
854
855                let inner = HummockStorage::new(
856                    opts.clone(),
857                    sstable_store,
858                    hummock_meta_client.clone(),
859                    notification_client,
860                    compaction_catalog_manager_ref,
861                    state_store_metrics.clone(),
862                    compactor_metrics.clone(),
863                    await_tree_config,
864                )
865                .await?;
866
867                StateStoreImpl::hummock(inner, storage_metrics)
868            }
869
870            "in_memory" | "in-memory" => {
871                tracing::warn!(
872                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
873                );
874                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
875            }
876
877            sled if sled.starts_with("sled://") => {
878                tracing::warn!(
879                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
880                );
881                let path = sled.strip_prefix("sled://").unwrap();
882                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
883            }
884
885            other => unimplemented!("{} state store is not supported", other),
886        };
887
888        Ok(store)
889    }
890}
891
892pub trait AsHummock: Send + Sync {
893    fn as_hummock(&self) -> Option<&HummockStorage>;
894
895    fn sync(
896        &self,
897        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
898    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
899        async move {
900            if let Some(hummock) = self.as_hummock() {
901                hummock.sync(sync_table_epochs).await
902            } else {
903                Ok(SyncResult::default())
904            }
905        }
906        .boxed()
907    }
908}
909
910impl AsHummock for HummockStorage {
911    fn as_hummock(&self) -> Option<&HummockStorage> {
912        Some(self)
913    }
914}
915
916impl AsHummock for MemoryStateStore {
917    fn as_hummock(&self) -> Option<&HummockStorage> {
918        None
919    }
920}
921
922impl AsHummock for SledStateStore {
923    fn as_hummock(&self) -> Option<&HummockStorage> {
924        None
925    }
926}
927
928#[cfg(debug_assertions)]
929mod dyn_state_store {
930    use std::future::Future;
931    use std::ops::DerefMut;
932    use std::sync::Arc;
933
934    use bytes::Bytes;
935    use risingwave_common::bitmap::Bitmap;
936    use risingwave_common::hash::VirtualNode;
937    use risingwave_hummock_sdk::HummockReadEpoch;
938    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
939
940    use crate::error::StorageResult;
941    use crate::hummock::HummockStorage;
942    use crate::store::*;
943    use crate::store_impl::AsHummock;
944    use crate::vector::VectorDistance;
945
946    #[async_trait::async_trait]
947    pub trait DynStateStoreIter<T: IterItem>: Send {
948        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
949    }
950
951    #[async_trait::async_trait]
952    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
953        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
954            self.try_next().await
955        }
956    }
957
958    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
959    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
960        fn try_next(
961            &mut self,
962        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
963            self.deref_mut().try_next()
964        }
965    }
966
967    // For StateStoreRead
968
969    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
970    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
971
972    #[async_trait::async_trait]
973    pub trait DynStateStoreGet: StaticSendSync {
974        async fn get_keyed_row(
975            &self,
976            key: TableKey<Bytes>,
977            read_options: ReadOptions,
978        ) -> StorageResult<Option<StateStoreKeyedRow>>;
979    }
980
981    #[async_trait::async_trait]
982    pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
983        async fn iter(
984            &self,
985            key_range: TableKeyRange,
986
987            read_options: ReadOptions,
988        ) -> StorageResult<BoxStateStoreReadIter>;
989
990        async fn rev_iter(
991            &self,
992            key_range: TableKeyRange,
993
994            read_options: ReadOptions,
995        ) -> StorageResult<BoxStateStoreReadIter>;
996    }
997
998    #[async_trait::async_trait]
999    pub trait DynStateStoreReadLog: StaticSendSync {
1000        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1001        async fn iter_log(
1002            &self,
1003            epoch_range: (u64, u64),
1004            key_range: TableKeyRange,
1005            options: ReadLogOptions,
1006        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1007    }
1008
1009    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1010
1011    #[async_trait::async_trait]
1012    impl<S: StateStoreGet> DynStateStoreGet for S {
1013        async fn get_keyed_row(
1014            &self,
1015            key: TableKey<Bytes>,
1016            read_options: ReadOptions,
1017        ) -> StorageResult<Option<StateStoreKeyedRow>> {
1018            self.on_key_value(key, read_options, move |key, value| {
1019                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1020            })
1021            .await
1022        }
1023    }
1024
1025    #[async_trait::async_trait]
1026    impl<S: StateStoreRead> DynStateStoreRead for S {
1027        async fn iter(
1028            &self,
1029            key_range: TableKeyRange,
1030
1031            read_options: ReadOptions,
1032        ) -> StorageResult<BoxStateStoreReadIter> {
1033            Ok(Box::new(self.iter(key_range, read_options).await?))
1034        }
1035
1036        async fn rev_iter(
1037            &self,
1038            key_range: TableKeyRange,
1039
1040            read_options: ReadOptions,
1041        ) -> StorageResult<BoxStateStoreReadIter> {
1042            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1043        }
1044    }
1045
1046    #[async_trait::async_trait]
1047    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1048        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1049            self.next_epoch(epoch, options).await
1050        }
1051
1052        async fn iter_log(
1053            &self,
1054            epoch_range: (u64, u64),
1055            key_range: TableKeyRange,
1056            options: ReadLogOptions,
1057        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1058            Ok(Box::new(
1059                self.iter_log(epoch_range, key_range, options).await?,
1060            ))
1061        }
1062    }
1063
1064    // For LocalStateStore
1065    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1066    #[async_trait::async_trait]
1067    pub trait DynLocalStateStore:
1068        DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1069    {
1070        async fn iter(
1071            &self,
1072            key_range: TableKeyRange,
1073            read_options: ReadOptions,
1074        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1075
1076        async fn rev_iter(
1077            &self,
1078            key_range: TableKeyRange,
1079            read_options: ReadOptions,
1080        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1081
1082        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1083
1084        fn insert(
1085            &mut self,
1086            key: TableKey<Bytes>,
1087            new_val: Bytes,
1088            old_val: Option<Bytes>,
1089        ) -> StorageResult<()>;
1090
1091        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1092
1093        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1094
1095        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1096    }
1097
1098    #[async_trait::async_trait]
1099    pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1100        async fn flush(&mut self) -> StorageResult<usize>;
1101
1102        async fn try_flush(&mut self) -> StorageResult<()>;
1103
1104        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1105
1106        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1107    }
1108
1109    #[async_trait::async_trait]
1110    impl<S: LocalStateStore> DynLocalStateStore for S {
1111        async fn iter(
1112            &self,
1113            key_range: TableKeyRange,
1114            read_options: ReadOptions,
1115        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1116            Ok(Box::new(self.iter(key_range, read_options).await?))
1117        }
1118
1119        async fn rev_iter(
1120            &self,
1121            key_range: TableKeyRange,
1122            read_options: ReadOptions,
1123        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1124            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1125        }
1126
1127        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1128            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1129        }
1130
1131        fn insert(
1132            &mut self,
1133            key: TableKey<Bytes>,
1134            new_val: Bytes,
1135            old_val: Option<Bytes>,
1136        ) -> StorageResult<()> {
1137            self.insert(key, new_val, old_val)
1138        }
1139
1140        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1141            self.delete(key, old_val)
1142        }
1143
1144        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1145            self.update_vnode_bitmap(vnodes).await
1146        }
1147
1148        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1149            self.get_table_watermark(vnode)
1150        }
1151    }
1152
1153    #[async_trait::async_trait]
1154    impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1155        async fn flush(&mut self) -> StorageResult<usize> {
1156            self.flush().await
1157        }
1158
1159        async fn try_flush(&mut self) -> StorageResult<()> {
1160            self.try_flush().await
1161        }
1162
1163        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1164            self.init(options).await
1165        }
1166
1167        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1168            self.seal_current_epoch(next_epoch, opts)
1169        }
1170    }
1171
1172    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1173
1174    impl LocalStateStore for BoxDynLocalStateStore {
1175        type FlushedSnapshotReader = StateStoreReadDynRef;
1176        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1177        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1178
1179        fn iter(
1180            &self,
1181            key_range: TableKeyRange,
1182            read_options: ReadOptions,
1183        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1184            (*self.0).iter(key_range, read_options)
1185        }
1186
1187        fn rev_iter(
1188            &self,
1189            key_range: TableKeyRange,
1190            read_options: ReadOptions,
1191        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1192            (*self.0).rev_iter(key_range, read_options)
1193        }
1194
1195        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1196            (*self.0).new_flushed_snapshot_reader()
1197        }
1198
1199        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1200            (*self.0).get_table_watermark(vnode)
1201        }
1202
1203        fn insert(
1204            &mut self,
1205            key: TableKey<Bytes>,
1206            new_val: Bytes,
1207            old_val: Option<Bytes>,
1208        ) -> StorageResult<()> {
1209            (*self.0).insert(key, new_val, old_val)
1210        }
1211
1212        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1213            (*self.0).delete(key, old_val)
1214        }
1215
1216        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1217            (*self.0).update_vnode_bitmap(vnodes).await
1218        }
1219    }
1220
1221    impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1222    where
1223        StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1224    {
1225        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1226            self.as_mut().flush()
1227        }
1228
1229        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1230            self.as_mut().try_flush()
1231        }
1232
1233        fn init(
1234            &mut self,
1235            options: InitOptions,
1236        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1237            self.as_mut().init(options)
1238        }
1239
1240        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1241            self.as_mut().seal_current_epoch(next_epoch, opts)
1242        }
1243    }
1244
1245    #[async_trait::async_trait]
1246    pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1247        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1248    }
1249
1250    #[async_trait::async_trait]
1251    impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1252        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1253            self.insert(vec, info)
1254        }
1255    }
1256
1257    pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1258
1259    impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1260        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1261            self.0.insert(vec, info)
1262        }
1263    }
1264
1265    // For global StateStore
1266
1267    #[async_trait::async_trait]
1268    pub trait DynStateStoreReadVector: StaticSendSync {
1269        async fn nearest(
1270            &self,
1271            vec: Vector,
1272            options: VectorNearestOptions,
1273        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1274    }
1275
1276    #[async_trait::async_trait]
1277    impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1278        async fn nearest(
1279            &self,
1280            vec: Vector,
1281            options: VectorNearestOptions,
1282        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1283            use risingwave_common::types::ScalarRef;
1284            self.nearest(vec, options, |vec, distance, info| {
1285                (
1286                    vec.to_owned_scalar(),
1287                    distance,
1288                    Bytes::copy_from_slice(info),
1289                )
1290            })
1291            .await
1292        }
1293    }
1294
1295    impl<P> StateStoreReadVector for StateStorePointer<P>
1296    where
1297        StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1298    {
1299        async fn nearest<O: Send + 'static>(
1300            &self,
1301            vec: Vector,
1302            options: VectorNearestOptions,
1303            on_nearest_item_fn: impl OnNearestItemFn<O>,
1304        ) -> StorageResult<Vec<O>> {
1305            let output = self.as_ref().nearest(vec, options).await?;
1306            Ok(output
1307                .into_iter()
1308                .map(|(vec, distance, info)| {
1309                    on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1310                })
1311                .collect())
1312        }
1313    }
1314
1315    pub trait DynStateStoreReadSnapshot:
1316        DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1317    {
1318    }
1319
1320    impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1321        for S
1322    {
1323    }
1324
1325    pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1326    #[async_trait::async_trait]
1327    pub trait DynStateStoreExt: StaticSendSync {
1328        async fn try_wait_epoch(
1329            &self,
1330            epoch: HummockReadEpoch,
1331            options: TryWaitEpochOptions,
1332        ) -> StorageResult<()>;
1333
1334        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1335        async fn new_read_snapshot(
1336            &self,
1337            epoch: HummockReadEpoch,
1338            options: NewReadSnapshotOptions,
1339        ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1340        async fn new_vector_writer(
1341            &self,
1342            options: NewVectorWriterOptions,
1343        ) -> BoxDynStateStoreWriteVector;
1344    }
1345
1346    #[async_trait::async_trait]
1347    impl<S: StateStore> DynStateStoreExt for S {
1348        async fn try_wait_epoch(
1349            &self,
1350            epoch: HummockReadEpoch,
1351            options: TryWaitEpochOptions,
1352        ) -> StorageResult<()> {
1353            self.try_wait_epoch(epoch, options).await
1354        }
1355
1356        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1357            StateStorePointer(Box::new(self.new_local(option).await))
1358        }
1359
1360        async fn new_read_snapshot(
1361            &self,
1362            epoch: HummockReadEpoch,
1363            options: NewReadSnapshotOptions,
1364        ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1365            Ok(StateStorePointer(Arc::new(
1366                self.new_read_snapshot(epoch, options).await?,
1367            )))
1368        }
1369
1370        async fn new_vector_writer(
1371            &self,
1372            options: NewVectorWriterOptions,
1373        ) -> BoxDynStateStoreWriteVector {
1374            StateStorePointer(Box::new(self.new_vector_writer(options).await))
1375        }
1376    }
1377
1378    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1379
1380    macro_rules! state_store_pointer_dyn_as_ref {
1381        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1382            impl AsRef<dyn $target_dyn_trait>
1383                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1384            {
1385                fn as_ref(&self) -> &dyn $target_dyn_trait {
1386                    (&*self.0) as _
1387                }
1388            }
1389        };
1390    }
1391
1392    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1393    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1394    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1395    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1396    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1397    state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1398
1399    macro_rules! state_store_pointer_dyn_as_mut {
1400        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1401            impl AsMut<dyn $target_dyn_trait>
1402                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1403            {
1404                fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1405                    (&mut *self.0) as _
1406                }
1407            }
1408        };
1409    }
1410
1411    state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1412    state_store_pointer_dyn_as_mut!(
1413        Box<dyn DynStateStoreWriteVector>,
1414        DynStateStoreWriteEpochControl
1415    );
1416
1417    #[derive(Clone)]
1418    pub struct StateStorePointer<P>(pub(crate) P);
1419
1420    impl<P> StateStoreGet for StateStorePointer<P>
1421    where
1422        StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1423    {
1424        async fn on_key_value<O: Send + 'static>(
1425            &self,
1426            key: TableKey<Bytes>,
1427            read_options: ReadOptions,
1428            on_key_value_fn: impl KeyValueFn<O>,
1429        ) -> StorageResult<Option<O>> {
1430            let option = self.as_ref().get_keyed_row(key, read_options).await?;
1431            option
1432                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1433                .transpose()
1434        }
1435    }
1436
1437    impl<P> StateStoreRead for StateStorePointer<P>
1438    where
1439        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1440    {
1441        type Iter = BoxStateStoreReadIter;
1442        type RevIter = BoxStateStoreReadIter;
1443
1444        fn iter(
1445            &self,
1446            key_range: TableKeyRange,
1447
1448            read_options: ReadOptions,
1449        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1450            self.as_ref().iter(key_range, read_options)
1451        }
1452
1453        fn rev_iter(
1454            &self,
1455            key_range: TableKeyRange,
1456
1457            read_options: ReadOptions,
1458        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1459            self.as_ref().rev_iter(key_range, read_options)
1460        }
1461    }
1462
1463    impl StateStoreReadLog for StateStoreDynRef {
1464        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1465
1466        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1467            (*self.0).next_epoch(epoch, options).await
1468        }
1469
1470        fn iter_log(
1471            &self,
1472            epoch_range: (u64, u64),
1473            key_range: TableKeyRange,
1474            options: ReadLogOptions,
1475        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1476            (*self.0).iter_log(epoch_range, key_range, options)
1477        }
1478    }
1479
1480    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1481
1482    impl AsHummock for StateStoreDynRef {
1483        fn as_hummock(&self) -> Option<&HummockStorage> {
1484            (*self.0).as_hummock()
1485        }
1486    }
1487
1488    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1489
1490    impl StateStore for StateStoreDynRef {
1491        type Local = BoxDynLocalStateStore;
1492        type ReadSnapshot = StateStoreReadSnapshotDynRef;
1493        type VectorWriter = BoxDynStateStoreWriteVector;
1494
1495        fn try_wait_epoch(
1496            &self,
1497            epoch: HummockReadEpoch,
1498            options: TryWaitEpochOptions,
1499        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1500            (*self.0).try_wait_epoch(epoch, options)
1501        }
1502
1503        fn new_local(
1504            &self,
1505            option: NewLocalOptions,
1506        ) -> impl Future<Output = Self::Local> + Send + '_ {
1507            (*self.0).new_local(option)
1508        }
1509
1510        async fn new_read_snapshot(
1511            &self,
1512            epoch: HummockReadEpoch,
1513            options: NewReadSnapshotOptions,
1514        ) -> StorageResult<Self::ReadSnapshot> {
1515            (*self.0).new_read_snapshot(epoch, options).await
1516        }
1517
1518        fn new_vector_writer(
1519            &self,
1520            options: NewVectorWriterOptions,
1521        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1522            (*self.0).new_vector_writer(options)
1523        }
1524    }
1525}