risingwave_storage/
store_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22    CacheBuilder, DirectFsDeviceOptions, Engine, FifoPicker, HybridCacheBuilder, LargeEngineOptions,
23};
24use futures::FutureExt;
25use futures::future::BoxFuture;
26use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
27use risingwave_common::catalog::TableId;
28use risingwave_common::license::Feature;
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common_service::RpcNotificationClient;
31use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
32use risingwave_object_store::object::build_remote_object_store;
33use thiserror_ext::AsReport;
34
35use crate::StateStore;
36use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
37use crate::error::StorageResult;
38use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
39use crate::hummock::{
40    Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
41    SstableBlockIndex, SstableStore, SstableStoreConfig,
42};
43use crate::memory::MemoryStateStore;
44use crate::memory::sled::SledStateStore;
45use crate::monitor::{
46    CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
47    ObjectStoreMetrics,
48};
49use crate::opts::StorageOpts;
50
51static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
52    Box::new(PrometheusMetricsRegistry::new(
53        GLOBAL_METRICS_REGISTRY.clone(),
54    ))
55});
56
57mod opaque_type {
58    use super::*;
59
60    pub type HummockStorageType = impl StateStore + AsHummock;
61    pub type MemoryStateStoreType = impl StateStore + AsHummock;
62    pub type SledStateStoreType = impl StateStore + AsHummock;
63
64    #[define_opaque(MemoryStateStoreType)]
65    pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
66        may_dynamic_dispatch(state_store)
67    }
68
69    #[define_opaque(HummockStorageType)]
70    pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
71        may_dynamic_dispatch(may_verify(state_store))
72    }
73
74    #[define_opaque(SledStateStoreType)]
75    pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
76        may_dynamic_dispatch(state_store)
77    }
78}
79pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
80use opaque_type::{hummock, in_memory, sled};
81
82#[cfg(feature = "hm-trace")]
83type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
84
85#[cfg(not(feature = "hm-trace"))]
86type Monitored<S> = MonitoredStateStore<S>;
87
88fn monitored<S: StateStore>(
89    state_store: S,
90    storage_metrics: Arc<MonitoredStorageMetrics>,
91) -> Monitored<S> {
92    let inner = {
93        #[cfg(feature = "hm-trace")]
94        {
95            crate::monitor::traced_store::TracedStateStore::new_global(state_store)
96        }
97        #[cfg(not(feature = "hm-trace"))]
98        {
99            state_store
100        }
101    };
102    inner.monitored(storage_metrics)
103}
104
105fn inner<S>(state_store: &Monitored<S>) -> &S {
106    let inner = state_store.inner();
107    {
108        #[cfg(feature = "hm-trace")]
109        {
110            inner.inner()
111        }
112        #[cfg(not(feature = "hm-trace"))]
113        {
114            inner
115        }
116    }
117}
118
119/// The type erased [`StateStore`].
120#[derive(Clone, EnumAsInner)]
121#[allow(clippy::enum_variant_names)]
122pub enum StateStoreImpl {
123    /// The Hummock state store, which operates on an S3-like service. URLs beginning with
124    /// `hummock` will be automatically recognized as Hummock state store.
125    ///
126    /// Example URLs:
127    ///
128    /// * `hummock+s3://bucket`
129    /// * `hummock+minio://KEY:SECRET@minio-ip:port`
130    /// * `hummock+memory` (should only be used in 1 compute node mode)
131    HummockStateStore(Monitored<HummockStorageType>),
132    /// In-memory B-Tree state store. Should only be used in unit and integration tests. If you
133    /// want speed up e2e test, you should use Hummock in-memory mode instead. Also, this state
134    /// store misses some critical implementation to ensure the correctness of persisting streaming
135    /// state. (e.g., no `read_epoch` support, no async checkpoint)
136    MemoryStateStore(Monitored<MemoryStateStoreType>),
137    SledStateStore(Monitored<SledStateStoreType>),
138}
139
140fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
141    #[cfg(not(debug_assertions))]
142    {
143        state_store
144    }
145    #[cfg(debug_assertions)]
146    {
147        use crate::store_impl::dyn_state_store::StateStorePointer;
148        StateStorePointer(Arc::new(state_store) as _)
149    }
150}
151
152fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
153    #[cfg(not(debug_assertions))]
154    {
155        state_store
156    }
157    #[cfg(debug_assertions)]
158    {
159        use std::marker::PhantomData;
160
161        use risingwave_common::util::env_var::env_var_is_true;
162        use tracing::info;
163
164        use crate::store_impl::verify::VerifyStateStore;
165
166        let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
167            info!("enable verify state store");
168            Some(SledStateStore::new_temp())
169        } else {
170            info!("verify state store is not enabled");
171            None
172        };
173        VerifyStateStore {
174            actual: state_store,
175            expected,
176            _phantom: PhantomData::<()>,
177        }
178    }
179}
180
181impl StateStoreImpl {
182    fn in_memory(
183        state_store: MemoryStateStore,
184        storage_metrics: Arc<MonitoredStorageMetrics>,
185    ) -> Self {
186        // The specific type of MemoryStateStoreType in deducted here.
187        Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
188    }
189
190    pub fn hummock(
191        state_store: HummockStorage,
192        storage_metrics: Arc<MonitoredStorageMetrics>,
193    ) -> Self {
194        // The specific type of HummockStateStoreType in deducted here.
195        Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
196    }
197
198    pub fn sled(
199        state_store: SledStateStore,
200        storage_metrics: Arc<MonitoredStorageMetrics>,
201    ) -> Self {
202        Self::SledStateStore(monitored(sled(state_store), storage_metrics))
203    }
204
205    pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
206        Self::in_memory(MemoryStateStore::shared(), storage_metrics)
207    }
208
209    pub fn for_test() -> Self {
210        Self::in_memory(
211            MemoryStateStore::new(),
212            Arc::new(MonitoredStorageMetrics::unused()),
213        )
214    }
215
216    pub fn as_hummock(&self) -> Option<&HummockStorage> {
217        match self {
218            StateStoreImpl::HummockStateStore(hummock) => {
219                Some(inner(hummock).as_hummock().expect("should be hummock"))
220            }
221            _ => None,
222        }
223    }
224}
225
226impl Debug for StateStoreImpl {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        match self {
229            StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
230            StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
231            StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
232        }
233    }
234}
235
236#[macro_export]
237macro_rules! dispatch_state_store {
238    ($impl:expr, $store:ident, $body:tt) => {{
239        use $crate::store_impl::StateStoreImpl;
240
241        match $impl {
242            StateStoreImpl::MemoryStateStore($store) => {
243                // WARNING: don't change this. Enabling memory backend will cause monomorphization
244                // explosion and thus slow compile time in release mode.
245                #[cfg(debug_assertions)]
246                {
247                    $body
248                }
249                #[cfg(not(debug_assertions))]
250                {
251                    let _store = $store;
252                    unimplemented!("memory state store should never be used in release mode");
253                }
254            }
255
256            StateStoreImpl::SledStateStore($store) => {
257                // WARNING: don't change this. Enabling memory backend will cause monomorphization
258                // explosion and thus slow compile time in release mode.
259                #[cfg(debug_assertions)]
260                {
261                    $body
262                }
263                #[cfg(not(debug_assertions))]
264                {
265                    let _store = $store;
266                    unimplemented!("sled state store should never be used in release mode");
267                }
268            }
269
270            StateStoreImpl::HummockStateStore($store) => $body,
271        }
272    }};
273}
274
275#[cfg(any(debug_assertions, test, feature = "test"))]
276pub mod verify {
277    use std::fmt::Debug;
278    use std::future::Future;
279    use std::marker::PhantomData;
280    use std::ops::Deref;
281    use std::sync::Arc;
282
283    use bytes::Bytes;
284    use risingwave_common::bitmap::Bitmap;
285    use risingwave_common::hash::VirtualNode;
286    use risingwave_hummock_sdk::HummockReadEpoch;
287    use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
288    use tracing::log::warn;
289
290    use crate::error::StorageResult;
291    use crate::hummock::HummockStorage;
292    use crate::store::*;
293    use crate::store_impl::AsHummock;
294
295    #[expect(dead_code)]
296    fn assert_result_eq<Item: PartialEq + Debug, E>(
297        first: &std::result::Result<Item, E>,
298        second: &std::result::Result<Item, E>,
299    ) {
300        match (first, second) {
301            (Ok(first), Ok(second)) => {
302                if first != second {
303                    warn!("result different: {:?} {:?}", first, second);
304                }
305                assert_eq!(first, second);
306            }
307            (Err(_), Err(_)) => {}
308            _ => {
309                warn!("one success and one failed");
310                panic!("result not equal");
311            }
312        }
313    }
314
315    #[derive(Clone)]
316    pub struct VerifyStateStore<A, E, T = ()> {
317        pub actual: A,
318        pub expected: Option<E>,
319        pub _phantom: PhantomData<T>,
320    }
321
322    impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
323        fn as_hummock(&self) -> Option<&HummockStorage> {
324            self.actual.as_hummock()
325        }
326    }
327
328    impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
329        async fn on_key_value<O: Send + 'static>(
330            &self,
331            key: TableKey<Bytes>,
332            read_options: ReadOptions,
333            on_key_value_fn: impl KeyValueFn<O>,
334        ) -> StorageResult<Option<O>> {
335            let actual: Option<(FullKey<Bytes>, Bytes)> = self
336                .actual
337                .on_key_value(key.clone(), read_options.clone(), |key, value| {
338                    Ok((key.copy_into(), Bytes::copy_from_slice(value)))
339                })
340                .await?;
341            if let Some(expected) = &self.expected {
342                let expected: Option<(FullKey<Bytes>, Bytes)> = expected
343                    .on_key_value(key, read_options, |key, value| {
344                        Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345                    })
346                    .await?;
347                assert_eq!(
348                    actual
349                        .as_ref()
350                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
351                    expected
352                        .as_ref()
353                        .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
354                );
355            }
356
357            actual
358                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
359                .transpose()
360        }
361    }
362
363    impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
364        for VerifyStateStore<A, E>
365    {
366        fn nearest<O: Send + 'static>(
367            &self,
368            vec: Vector,
369            options: VectorNearestOptions,
370            on_nearest_item_fn: impl OnNearestItemFn<O>,
371        ) -> impl StorageFuture<'_, Vec<O>> {
372            self.actual.nearest(vec, options, on_nearest_item_fn)
373        }
374    }
375
376    impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
377        type Iter = impl StateStoreReadIter;
378        type RevIter = impl StateStoreReadIter;
379
380        // TODO: may avoid manual async fn when the bug of rust compiler is fixed. Currently it will
381        // fail to compile.
382        #[expect(clippy::manual_async_fn)]
383        fn iter(
384            &self,
385            key_range: TableKeyRange,
386
387            read_options: ReadOptions,
388        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
389            async move {
390                let actual = self
391                    .actual
392                    .iter(key_range.clone(), read_options.clone())
393                    .await?;
394                let expected = if let Some(expected) = &self.expected {
395                    Some(expected.iter(key_range, read_options).await?)
396                } else {
397                    None
398                };
399
400                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
401            }
402        }
403
404        #[expect(clippy::manual_async_fn)]
405        fn rev_iter(
406            &self,
407            key_range: TableKeyRange,
408
409            read_options: ReadOptions,
410        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
411            async move {
412                let actual = self
413                    .actual
414                    .rev_iter(key_range.clone(), read_options.clone())
415                    .await?;
416                let expected = if let Some(expected) = &self.expected {
417                    Some(expected.rev_iter(key_range, read_options).await?)
418                } else {
419                    None
420                };
421
422                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
423            }
424        }
425    }
426
427    impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
428        type ChangeLogIter = impl StateStoreReadChangeLogIter;
429
430        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
431            let actual = self.actual.next_epoch(epoch, options.clone()).await?;
432            if let Some(expected) = &self.expected {
433                assert_eq!(actual, expected.next_epoch(epoch, options).await?);
434            }
435            Ok(actual)
436        }
437
438        async fn iter_log(
439            &self,
440            epoch_range: (u64, u64),
441            key_range: TableKeyRange,
442            options: ReadLogOptions,
443        ) -> StorageResult<Self::ChangeLogIter> {
444            let actual = self
445                .actual
446                .iter_log(epoch_range, key_range.clone(), options.clone())
447                .await?;
448            let expected = if let Some(expected) = &self.expected {
449                Some(expected.iter_log(epoch_range, key_range, options).await?)
450            } else {
451                None
452            };
453
454            Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
455        }
456    }
457
458    impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
459        for VerifyStateStore<A, E, T>
460    where
461        for<'a> T::ItemRef<'a>: PartialEq + Debug,
462    {
463        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
464            let actual = self.actual.try_next().await?;
465            if let Some(expected) = self.expected.as_mut() {
466                let expected = expected.try_next().await?;
467                assert_eq!(actual, expected);
468            }
469            Ok(actual)
470        }
471    }
472
473    fn verify_iter<T: IterItem>(
474        actual: impl StateStoreIter<T>,
475        expected: Option<impl StateStoreIter<T>>,
476    ) -> impl StateStoreIter<T>
477    where
478        for<'a> T::ItemRef<'a>: PartialEq + Debug,
479    {
480        VerifyStateStore {
481            actual,
482            expected,
483            _phantom: PhantomData::<T>,
484        }
485    }
486
487    impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
488        type FlushedSnapshotReader =
489            VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
490
491        type Iter<'a> = impl StateStoreIter + 'a;
492        type RevIter<'a> = impl StateStoreIter + 'a;
493
494        #[expect(clippy::manual_async_fn)]
495        fn iter(
496            &self,
497            key_range: TableKeyRange,
498            read_options: ReadOptions,
499        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
500            async move {
501                let actual = self
502                    .actual
503                    .iter(key_range.clone(), read_options.clone())
504                    .await?;
505                let expected = if let Some(expected) = &self.expected {
506                    Some(expected.iter(key_range, read_options).await?)
507                } else {
508                    None
509                };
510
511                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
512            }
513        }
514
515        #[expect(clippy::manual_async_fn)]
516        fn rev_iter(
517            &self,
518            key_range: TableKeyRange,
519            read_options: ReadOptions,
520        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
521            async move {
522                let actual = self
523                    .actual
524                    .rev_iter(key_range.clone(), read_options.clone())
525                    .await?;
526                let expected = if let Some(expected) = &self.expected {
527                    Some(expected.rev_iter(key_range, read_options).await?)
528                } else {
529                    None
530                };
531
532                Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
533            }
534        }
535
536        fn insert(
537            &mut self,
538            key: TableKey<Bytes>,
539            new_val: Bytes,
540            old_val: Option<Bytes>,
541        ) -> StorageResult<()> {
542            if let Some(expected) = &mut self.expected {
543                expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
544            }
545            self.actual.insert(key, new_val, old_val)?;
546
547            Ok(())
548        }
549
550        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
551            if let Some(expected) = &mut self.expected {
552                expected.delete(key.clone(), old_val.clone())?;
553            }
554            self.actual.delete(key, old_val)?;
555            Ok(())
556        }
557
558        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
559            let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
560            if let Some(expected) = &mut self.expected {
561                assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
562            }
563            Ok(ret)
564        }
565
566        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
567            let ret = self.actual.get_table_watermark(vnode);
568            if let Some(expected) = &self.expected {
569                assert_eq!(ret, expected.get_table_watermark(vnode));
570            }
571            ret
572        }
573
574        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
575            VerifyStateStore {
576                actual: self.actual.new_flushed_snapshot_reader(),
577                expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
578                _phantom: Default::default(),
579            }
580        }
581    }
582
583    impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
584        for VerifyStateStore<A, E>
585    {
586        async fn flush(&mut self) -> StorageResult<usize> {
587            if let Some(expected) = &mut self.expected {
588                expected.flush().await?;
589            }
590            self.actual.flush().await
591        }
592
593        async fn try_flush(&mut self) -> StorageResult<()> {
594            if let Some(expected) = &mut self.expected {
595                expected.try_flush().await?;
596            }
597            self.actual.try_flush().await
598        }
599
600        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
601            self.actual.init(options.clone()).await?;
602            if let Some(expected) = &mut self.expected {
603                expected.init(options).await?;
604            }
605            Ok(())
606        }
607
608        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
609            if let Some(expected) = &mut self.expected {
610                expected.seal_current_epoch(next_epoch, opts.clone());
611            }
612            self.actual.seal_current_epoch(next_epoch, opts);
613        }
614    }
615
616    impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
617        type Local = VerifyStateStore<A::Local, E::Local>;
618        type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
619        type VectorWriter = A::VectorWriter;
620
621        fn try_wait_epoch(
622            &self,
623            epoch: HummockReadEpoch,
624            options: TryWaitEpochOptions,
625        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
626            self.actual.try_wait_epoch(epoch, options)
627        }
628
629        async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
630            let expected = if let Some(expected) = &self.expected {
631                Some(expected.new_local(option.clone()).await)
632            } else {
633                None
634            };
635            VerifyStateStore {
636                actual: self.actual.new_local(option).await,
637                expected,
638                _phantom: PhantomData::<()>,
639            }
640        }
641
642        async fn new_read_snapshot(
643            &self,
644            epoch: HummockReadEpoch,
645            options: NewReadSnapshotOptions,
646        ) -> StorageResult<Self::ReadSnapshot> {
647            let expected = if let Some(expected) = &self.expected {
648                Some(expected.new_read_snapshot(epoch, options).await?)
649            } else {
650                None
651            };
652            Ok(VerifyStateStore {
653                actual: self.actual.new_read_snapshot(epoch, options).await?,
654                expected,
655                _phantom: PhantomData::<()>,
656            })
657        }
658
659        fn new_vector_writer(
660            &self,
661            options: NewVectorWriterOptions,
662        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
663            self.actual.new_vector_writer(options)
664        }
665    }
666
667    impl<A, E> Deref for VerifyStateStore<A, E> {
668        type Target = A;
669
670        fn deref(&self) -> &Self::Target {
671            &self.actual
672        }
673    }
674}
675
676impl StateStoreImpl {
677    #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
678    #[allow(clippy::too_many_arguments)]
679    #[expect(clippy::borrowed_box)]
680    pub async fn new(
681        s: &str,
682        opts: Arc<StorageOpts>,
683        hummock_meta_client: Arc<MonitoredHummockMetaClient>,
684        state_store_metrics: Arc<HummockStateStoreMetrics>,
685        object_store_metrics: Arc<ObjectStoreMetrics>,
686        storage_metrics: Arc<MonitoredStorageMetrics>,
687        compactor_metrics: Arc<CompactorMetrics>,
688        await_tree_config: Option<await_tree::Config>,
689        use_new_object_prefix_strategy: bool,
690    ) -> StorageResult<Self> {
691        const KB: usize = 1 << 10;
692        const MB: usize = 1 << 20;
693
694        let meta_cache = {
695            let mut builder = HybridCacheBuilder::new()
696                .with_name("foyer.meta")
697                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
698                .memory(opts.meta_cache_capacity_mb * MB)
699                .with_shards(opts.meta_cache_shard_num)
700                .with_eviction_config(opts.meta_cache_eviction_config.clone())
701                .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
702                    u64::BITS as usize / 8 + value.estimate_size()
703                })
704                .storage(Engine::Large(
705                    LargeEngineOptions::new()
706                        .with_indexer_shards(opts.meta_file_cache_indexer_shards)
707                        .with_flushers(opts.meta_file_cache_flushers)
708                        .with_reclaimers(opts.meta_file_cache_reclaimers)
709                        .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
710                        .with_clean_region_threshold(
711                            opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
712                        )
713                        .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
714                        .with_blob_index_size(16 * KB)
715                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
716                            opts.meta_file_cache_fifo_probation_ratio,
717                        ))]),
718                ));
719
720            if !opts.meta_file_cache_dir.is_empty() {
721                if let Err(e) = Feature::ElasticDiskCache.check_available() {
722                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
723                } else {
724                    builder = builder
725                        .with_device_options(
726                            DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
727                                .with_capacity(opts.meta_file_cache_capacity_mb * MB)
728                                .with_file_size(opts.meta_file_cache_file_capacity_mb * MB)
729                                .with_throttle(opts.meta_file_cache_throttle.clone()),
730                        )
731                        .with_recover_mode(opts.meta_file_cache_recover_mode)
732                        .with_compression(opts.meta_file_cache_compression)
733                        .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
734                }
735            }
736
737            builder.build().await.map_err(HummockError::foyer_error)?
738        };
739
740        let block_cache = {
741            let mut builder = HybridCacheBuilder::new()
742                .with_name("foyer.data")
743                .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
744                .with_event_listener(Arc::new(BlockCacheEventListener::new(
745                    state_store_metrics.clone(),
746                )))
747                .memory(opts.block_cache_capacity_mb * MB)
748                .with_shards(opts.block_cache_shard_num)
749                .with_eviction_config(opts.block_cache_eviction_config.clone())
750                .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
751                    // FIXME(MrCroxx): Calculate block weight more accurately.
752                    u64::BITS as usize * 2 / 8 + value.raw().len()
753                })
754                .storage(Engine::Large(
755                    LargeEngineOptions::new()
756                        .with_indexer_shards(opts.data_file_cache_indexer_shards)
757                        .with_flushers(opts.data_file_cache_flushers)
758                        .with_reclaimers(opts.data_file_cache_reclaimers)
759                        .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) // 128 MiB
760                        .with_clean_region_threshold(
761                            opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
762                        )
763                        .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
764                        .with_blob_index_size(16 * KB)
765                        .with_eviction_pickers(vec![Box::new(FifoPicker::new(
766                            opts.data_file_cache_fifo_probation_ratio,
767                        ))]),
768                ));
769
770            if !opts.data_file_cache_dir.is_empty() {
771                if let Err(e) = Feature::ElasticDiskCache.check_available() {
772                    tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
773                } else {
774                    builder = builder
775                        .with_device_options(
776                            DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
777                                .with_capacity(opts.data_file_cache_capacity_mb * MB)
778                                .with_file_size(opts.data_file_cache_file_capacity_mb * MB)
779                                .with_throttle(opts.data_file_cache_throttle.clone()),
780                        )
781                        .with_recover_mode(opts.data_file_cache_recover_mode)
782                        .with_compression(opts.data_file_cache_compression)
783                        .with_runtime_options(opts.data_file_cache_runtime_config.clone());
784                }
785            }
786
787            builder.build().await.map_err(HummockError::foyer_error)?
788        };
789
790        let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
791            .with_shards(opts.vector_meta_cache_shard_num)
792            .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
793            .build();
794
795        let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
796            .with_shards(opts.vector_block_cache_shard_num)
797            .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
798            .build();
799
800        let recent_filter = if opts.data_file_cache_dir.is_empty() {
801            None
802        } else {
803            Some(Arc::new(RecentFilter::new(
804                opts.cache_refill_recent_filter_layers,
805                Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
806            )))
807        };
808
809        let store = match s {
810            hummock if hummock.starts_with("hummock+") => {
811                let object_store = build_remote_object_store(
812                    hummock.strip_prefix("hummock+").unwrap(),
813                    object_store_metrics.clone(),
814                    "Hummock",
815                    Arc::new(opts.object_store_config.clone()),
816                )
817                .await;
818
819                let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
820                    store: Arc::new(object_store),
821                    path: opts.data_directory.clone(),
822                    prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
823                    max_prefetch_block_number: opts.max_prefetch_block_number,
824                    recent_filter,
825                    state_store_metrics: state_store_metrics.clone(),
826                    use_new_object_prefix_strategy,
827
828                    meta_cache,
829                    block_cache,
830                    vector_meta_cache,
831                    vector_block_cache,
832                }));
833                let notification_client =
834                    RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
835                let compaction_catalog_manager_ref =
836                    Arc::new(CompactionCatalogManager::new(Box::new(
837                        RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
838                    )));
839
840                let inner = HummockStorage::new(
841                    opts.clone(),
842                    sstable_store,
843                    hummock_meta_client.clone(),
844                    notification_client,
845                    compaction_catalog_manager_ref,
846                    state_store_metrics.clone(),
847                    compactor_metrics.clone(),
848                    await_tree_config,
849                )
850                .await?;
851
852                StateStoreImpl::hummock(inner, storage_metrics)
853            }
854
855            "in_memory" | "in-memory" => {
856                tracing::warn!(
857                    "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
858                );
859                StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
860            }
861
862            sled if sled.starts_with("sled://") => {
863                tracing::warn!(
864                    "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
865                );
866                let path = sled.strip_prefix("sled://").unwrap();
867                StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
868            }
869
870            other => unimplemented!("{} state store is not supported", other),
871        };
872
873        Ok(store)
874    }
875}
876
877pub trait AsHummock: Send + Sync {
878    fn as_hummock(&self) -> Option<&HummockStorage>;
879
880    fn sync(
881        &self,
882        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
883    ) -> BoxFuture<'_, StorageResult<SyncResult>> {
884        async move {
885            if let Some(hummock) = self.as_hummock() {
886                hummock.sync(sync_table_epochs).await
887            } else {
888                Ok(SyncResult::default())
889            }
890        }
891        .boxed()
892    }
893}
894
895impl AsHummock for HummockStorage {
896    fn as_hummock(&self) -> Option<&HummockStorage> {
897        Some(self)
898    }
899}
900
901impl AsHummock for MemoryStateStore {
902    fn as_hummock(&self) -> Option<&HummockStorage> {
903        None
904    }
905}
906
907impl AsHummock for SledStateStore {
908    fn as_hummock(&self) -> Option<&HummockStorage> {
909        None
910    }
911}
912
913#[cfg(debug_assertions)]
914mod dyn_state_store {
915    use std::future::Future;
916    use std::ops::DerefMut;
917    use std::sync::Arc;
918
919    use bytes::Bytes;
920    use risingwave_common::bitmap::Bitmap;
921    use risingwave_common::hash::VirtualNode;
922    use risingwave_hummock_sdk::HummockReadEpoch;
923    use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
924
925    use crate::error::StorageResult;
926    use crate::hummock::HummockStorage;
927    use crate::store::*;
928    use crate::store_impl::AsHummock;
929    use crate::vector::VectorDistance;
930
931    #[async_trait::async_trait]
932    pub trait DynStateStoreIter<T: IterItem>: Send {
933        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
934    }
935
936    #[async_trait::async_trait]
937    impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
938        async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
939            self.try_next().await
940        }
941    }
942
943    pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
944    impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
945        fn try_next(
946            &mut self,
947        ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
948            self.deref_mut().try_next()
949        }
950    }
951
952    // For StateStoreRead
953
954    pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
955    pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
956
957    #[async_trait::async_trait]
958    pub trait DynStateStoreGet: StaticSendSync {
959        async fn get_keyed_row(
960            &self,
961            key: TableKey<Bytes>,
962            read_options: ReadOptions,
963        ) -> StorageResult<Option<StateStoreKeyedRow>>;
964    }
965
966    #[async_trait::async_trait]
967    pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
968        async fn iter(
969            &self,
970            key_range: TableKeyRange,
971
972            read_options: ReadOptions,
973        ) -> StorageResult<BoxStateStoreReadIter>;
974
975        async fn rev_iter(
976            &self,
977            key_range: TableKeyRange,
978
979            read_options: ReadOptions,
980        ) -> StorageResult<BoxStateStoreReadIter>;
981    }
982
983    #[async_trait::async_trait]
984    pub trait DynStateStoreReadLog: StaticSendSync {
985        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
986        async fn iter_log(
987            &self,
988            epoch_range: (u64, u64),
989            key_range: TableKeyRange,
990            options: ReadLogOptions,
991        ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
992    }
993
994    pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
995
996    #[async_trait::async_trait]
997    impl<S: StateStoreGet> DynStateStoreGet for S {
998        async fn get_keyed_row(
999            &self,
1000            key: TableKey<Bytes>,
1001            read_options: ReadOptions,
1002        ) -> StorageResult<Option<StateStoreKeyedRow>> {
1003            self.on_key_value(key, read_options, move |key, value| {
1004                Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1005            })
1006            .await
1007        }
1008    }
1009
1010    #[async_trait::async_trait]
1011    impl<S: StateStoreRead> DynStateStoreRead for S {
1012        async fn iter(
1013            &self,
1014            key_range: TableKeyRange,
1015
1016            read_options: ReadOptions,
1017        ) -> StorageResult<BoxStateStoreReadIter> {
1018            Ok(Box::new(self.iter(key_range, read_options).await?))
1019        }
1020
1021        async fn rev_iter(
1022            &self,
1023            key_range: TableKeyRange,
1024
1025            read_options: ReadOptions,
1026        ) -> StorageResult<BoxStateStoreReadIter> {
1027            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1028        }
1029    }
1030
1031    #[async_trait::async_trait]
1032    impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1033        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1034            self.next_epoch(epoch, options).await
1035        }
1036
1037        async fn iter_log(
1038            &self,
1039            epoch_range: (u64, u64),
1040            key_range: TableKeyRange,
1041            options: ReadLogOptions,
1042        ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1043            Ok(Box::new(
1044                self.iter_log(epoch_range, key_range, options).await?,
1045            ))
1046        }
1047    }
1048
1049    // For LocalStateStore
1050    pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1051    #[async_trait::async_trait]
1052    pub trait DynLocalStateStore:
1053        DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1054    {
1055        async fn iter(
1056            &self,
1057            key_range: TableKeyRange,
1058            read_options: ReadOptions,
1059        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1060
1061        async fn rev_iter(
1062            &self,
1063            key_range: TableKeyRange,
1064            read_options: ReadOptions,
1065        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1066
1067        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1068
1069        fn insert(
1070            &mut self,
1071            key: TableKey<Bytes>,
1072            new_val: Bytes,
1073            old_val: Option<Bytes>,
1074        ) -> StorageResult<()>;
1075
1076        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1077
1078        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1079
1080        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1081    }
1082
1083    #[async_trait::async_trait]
1084    pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1085        async fn flush(&mut self) -> StorageResult<usize>;
1086
1087        async fn try_flush(&mut self) -> StorageResult<()>;
1088
1089        async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1090
1091        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1092    }
1093
1094    #[async_trait::async_trait]
1095    impl<S: LocalStateStore> DynLocalStateStore for S {
1096        async fn iter(
1097            &self,
1098            key_range: TableKeyRange,
1099            read_options: ReadOptions,
1100        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1101            Ok(Box::new(self.iter(key_range, read_options).await?))
1102        }
1103
1104        async fn rev_iter(
1105            &self,
1106            key_range: TableKeyRange,
1107            read_options: ReadOptions,
1108        ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1109            Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1110        }
1111
1112        fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1113            StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1114        }
1115
1116        fn insert(
1117            &mut self,
1118            key: TableKey<Bytes>,
1119            new_val: Bytes,
1120            old_val: Option<Bytes>,
1121        ) -> StorageResult<()> {
1122            self.insert(key, new_val, old_val)
1123        }
1124
1125        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1126            self.delete(key, old_val)
1127        }
1128
1129        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1130            self.update_vnode_bitmap(vnodes).await
1131        }
1132
1133        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1134            self.get_table_watermark(vnode)
1135        }
1136    }
1137
1138    #[async_trait::async_trait]
1139    impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1140        async fn flush(&mut self) -> StorageResult<usize> {
1141            self.flush().await
1142        }
1143
1144        async fn try_flush(&mut self) -> StorageResult<()> {
1145            self.try_flush().await
1146        }
1147
1148        async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1149            self.init(options).await
1150        }
1151
1152        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1153            self.seal_current_epoch(next_epoch, opts)
1154        }
1155    }
1156
1157    pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1158
1159    impl LocalStateStore for BoxDynLocalStateStore {
1160        type FlushedSnapshotReader = StateStoreReadDynRef;
1161        type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1162        type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1163
1164        fn iter(
1165            &self,
1166            key_range: TableKeyRange,
1167            read_options: ReadOptions,
1168        ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1169            (*self.0).iter(key_range, read_options)
1170        }
1171
1172        fn rev_iter(
1173            &self,
1174            key_range: TableKeyRange,
1175            read_options: ReadOptions,
1176        ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1177            (*self.0).rev_iter(key_range, read_options)
1178        }
1179
1180        fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1181            (*self.0).new_flushed_snapshot_reader()
1182        }
1183
1184        fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1185            (*self.0).get_table_watermark(vnode)
1186        }
1187
1188        fn insert(
1189            &mut self,
1190            key: TableKey<Bytes>,
1191            new_val: Bytes,
1192            old_val: Option<Bytes>,
1193        ) -> StorageResult<()> {
1194            (*self.0).insert(key, new_val, old_val)
1195        }
1196
1197        fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1198            (*self.0).delete(key, old_val)
1199        }
1200
1201        async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1202            (*self.0).update_vnode_bitmap(vnodes).await
1203        }
1204    }
1205
1206    impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1207    where
1208        StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1209    {
1210        fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1211            self.as_mut().flush()
1212        }
1213
1214        fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1215            self.as_mut().try_flush()
1216        }
1217
1218        fn init(
1219            &mut self,
1220            options: InitOptions,
1221        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1222            self.as_mut().init(options)
1223        }
1224
1225        fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1226            self.as_mut().seal_current_epoch(next_epoch, opts)
1227        }
1228    }
1229
1230    #[async_trait::async_trait]
1231    pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1232        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1233    }
1234
1235    #[async_trait::async_trait]
1236    impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1237        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1238            self.insert(vec, info)
1239        }
1240    }
1241
1242    pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1243
1244    impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1245        fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1246            self.0.insert(vec, info)
1247        }
1248    }
1249
1250    // For global StateStore
1251
1252    #[async_trait::async_trait]
1253    pub trait DynStateStoreReadVector: StaticSendSync {
1254        async fn nearest(
1255            &self,
1256            vec: Vector,
1257            options: VectorNearestOptions,
1258        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1259    }
1260
1261    #[async_trait::async_trait]
1262    impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1263        async fn nearest(
1264            &self,
1265            vec: Vector,
1266            options: VectorNearestOptions,
1267        ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1268            self.nearest(vec, options, |vec, distance, info| {
1269                (
1270                    Vector::clone_from_ref(vec),
1271                    distance,
1272                    Bytes::copy_from_slice(info),
1273                )
1274            })
1275            .await
1276        }
1277    }
1278
1279    impl<P> StateStoreReadVector for StateStorePointer<P>
1280    where
1281        StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1282    {
1283        async fn nearest<O: Send + 'static>(
1284            &self,
1285            vec: Vector,
1286            options: VectorNearestOptions,
1287            on_nearest_item_fn: impl OnNearestItemFn<O>,
1288        ) -> StorageResult<Vec<O>> {
1289            let output = self.as_ref().nearest(vec, options).await?;
1290            Ok(output
1291                .into_iter()
1292                .map(|(vec, distance, info)| {
1293                    on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1294                })
1295                .collect())
1296        }
1297    }
1298
1299    pub trait DynStateStoreReadSnapshot:
1300        DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1301    {
1302    }
1303
1304    impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1305        for S
1306    {
1307    }
1308
1309    pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1310    #[async_trait::async_trait]
1311    pub trait DynStateStoreExt: StaticSendSync {
1312        async fn try_wait_epoch(
1313            &self,
1314            epoch: HummockReadEpoch,
1315            options: TryWaitEpochOptions,
1316        ) -> StorageResult<()>;
1317
1318        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1319        async fn new_read_snapshot(
1320            &self,
1321            epoch: HummockReadEpoch,
1322            options: NewReadSnapshotOptions,
1323        ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1324        async fn new_vector_writer(
1325            &self,
1326            options: NewVectorWriterOptions,
1327        ) -> BoxDynStateStoreWriteVector;
1328    }
1329
1330    #[async_trait::async_trait]
1331    impl<S: StateStore> DynStateStoreExt for S {
1332        async fn try_wait_epoch(
1333            &self,
1334            epoch: HummockReadEpoch,
1335            options: TryWaitEpochOptions,
1336        ) -> StorageResult<()> {
1337            self.try_wait_epoch(epoch, options).await
1338        }
1339
1340        async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1341            StateStorePointer(Box::new(self.new_local(option).await))
1342        }
1343
1344        async fn new_read_snapshot(
1345            &self,
1346            epoch: HummockReadEpoch,
1347            options: NewReadSnapshotOptions,
1348        ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1349            Ok(StateStorePointer(Arc::new(
1350                self.new_read_snapshot(epoch, options).await?,
1351            )))
1352        }
1353
1354        async fn new_vector_writer(
1355            &self,
1356            options: NewVectorWriterOptions,
1357        ) -> BoxDynStateStoreWriteVector {
1358            StateStorePointer(Box::new(self.new_vector_writer(options).await))
1359        }
1360    }
1361
1362    pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1363
1364    macro_rules! state_store_pointer_dyn_as_ref {
1365        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1366            impl AsRef<dyn $target_dyn_trait>
1367                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1368            {
1369                fn as_ref(&self) -> &dyn $target_dyn_trait {
1370                    (&*self.0) as _
1371                }
1372            }
1373        };
1374    }
1375
1376    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1377    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1378    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1379    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1380    state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1381    state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1382
1383    macro_rules! state_store_pointer_dyn_as_mut {
1384        ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1385            impl AsMut<dyn $target_dyn_trait>
1386                for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1387            {
1388                fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1389                    (&mut *self.0) as _
1390                }
1391            }
1392        };
1393    }
1394
1395    state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1396    state_store_pointer_dyn_as_mut!(
1397        Box<dyn DynStateStoreWriteVector>,
1398        DynStateStoreWriteEpochControl
1399    );
1400
1401    #[derive(Clone)]
1402    pub struct StateStorePointer<P>(pub(crate) P);
1403
1404    impl<P> StateStoreGet for StateStorePointer<P>
1405    where
1406        StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1407    {
1408        async fn on_key_value<O: Send + 'static>(
1409            &self,
1410            key: TableKey<Bytes>,
1411            read_options: ReadOptions,
1412            on_key_value_fn: impl KeyValueFn<O>,
1413        ) -> StorageResult<Option<O>> {
1414            let option = self.as_ref().get_keyed_row(key, read_options).await?;
1415            option
1416                .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1417                .transpose()
1418        }
1419    }
1420
1421    impl<P> StateStoreRead for StateStorePointer<P>
1422    where
1423        StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1424    {
1425        type Iter = BoxStateStoreReadIter;
1426        type RevIter = BoxStateStoreReadIter;
1427
1428        fn iter(
1429            &self,
1430            key_range: TableKeyRange,
1431
1432            read_options: ReadOptions,
1433        ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1434            self.as_ref().iter(key_range, read_options)
1435        }
1436
1437        fn rev_iter(
1438            &self,
1439            key_range: TableKeyRange,
1440
1441            read_options: ReadOptions,
1442        ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1443            self.as_ref().rev_iter(key_range, read_options)
1444        }
1445    }
1446
1447    impl StateStoreReadLog for StateStoreDynRef {
1448        type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1449
1450        async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1451            (*self.0).next_epoch(epoch, options).await
1452        }
1453
1454        fn iter_log(
1455            &self,
1456            epoch_range: (u64, u64),
1457            key_range: TableKeyRange,
1458            options: ReadLogOptions,
1459        ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1460            (*self.0).iter_log(epoch_range, key_range, options)
1461        }
1462    }
1463
1464    pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1465
1466    impl AsHummock for StateStoreDynRef {
1467        fn as_hummock(&self) -> Option<&HummockStorage> {
1468            (*self.0).as_hummock()
1469        }
1470    }
1471
1472    impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1473
1474    impl StateStore for StateStoreDynRef {
1475        type Local = BoxDynLocalStateStore;
1476        type ReadSnapshot = StateStoreReadSnapshotDynRef;
1477        type VectorWriter = BoxDynStateStoreWriteVector;
1478
1479        fn try_wait_epoch(
1480            &self,
1481            epoch: HummockReadEpoch,
1482            options: TryWaitEpochOptions,
1483        ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1484            (*self.0).try_wait_epoch(epoch, options)
1485        }
1486
1487        fn new_local(
1488            &self,
1489            option: NewLocalOptions,
1490        ) -> impl Future<Output = Self::Local> + Send + '_ {
1491            (*self.0).new_local(option)
1492        }
1493
1494        async fn new_read_snapshot(
1495            &self,
1496            epoch: HummockReadEpoch,
1497            options: NewReadSnapshotOptions,
1498        ) -> StorageResult<Self::ReadSnapshot> {
1499            (*self.0).new_read_snapshot(epoch, options).await
1500        }
1501
1502        fn new_vector_writer(
1503            &self,
1504            options: NewVectorWriterOptions,
1505        ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1506            (*self.0).new_vector_writer(options)
1507        }
1508    }
1509}