1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22 BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23 HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45 Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46 SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52 ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57 Box::new(PrometheusMetricsRegistry::new(
58 GLOBAL_METRICS_REGISTRY.clone(),
59 ))
60});
61
62mod opaque_type {
63 use super::*;
64
65 pub type HummockStorageType = impl StateStore + AsHummock;
66 pub type MemoryStateStoreType = impl StateStore + AsHummock;
67 pub type SledStateStoreType = impl StateStore + AsHummock;
68
69 #[define_opaque(MemoryStateStoreType)]
70 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71 may_dynamic_dispatch(state_store)
72 }
73
74 #[define_opaque(HummockStorageType)]
75 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76 may_dynamic_dispatch(may_verify(state_store))
77 }
78
79 #[define_opaque(SledStateStoreType)]
80 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81 may_dynamic_dispatch(state_store)
82 }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94 state_store: S,
95 storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97 let inner = {
98 #[cfg(feature = "hm-trace")]
99 {
100 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101 }
102 #[cfg(not(feature = "hm-trace"))]
103 {
104 state_store
105 }
106 };
107 inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111 let inner = state_store.inner();
112 {
113 #[cfg(feature = "hm-trace")]
114 {
115 inner.inner()
116 }
117 #[cfg(not(feature = "hm-trace"))]
118 {
119 inner
120 }
121 }
122}
123
124#[derive(Clone, EnumAsInner)]
126#[allow(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128 HummockStateStore(Monitored<HummockStorageType>),
137 MemoryStateStore(Monitored<MemoryStateStoreType>),
142 SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146 #[cfg(not(debug_assertions))]
147 {
148 state_store
149 }
150 #[cfg(debug_assertions)]
151 {
152 use crate::store_impl::dyn_state_store::StateStorePointer;
153 StateStorePointer(Arc::new(state_store) as _)
154 }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158 #[cfg(not(debug_assertions))]
159 {
160 state_store
161 }
162 #[cfg(debug_assertions)]
163 {
164 use std::marker::PhantomData;
165
166 use risingwave_common::util::env_var::env_var_is_true;
167 use tracing::info;
168
169 use crate::store_impl::verify::VerifyStateStore;
170
171 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172 info!("enable verify state store");
173 Some(SledStateStore::new_temp())
174 } else {
175 info!("verify state store is not enabled");
176 None
177 };
178 VerifyStateStore {
179 actual: state_store,
180 expected,
181 _phantom: PhantomData::<()>,
182 }
183 }
184}
185
186impl StateStoreImpl {
187 fn in_memory(
188 state_store: MemoryStateStore,
189 storage_metrics: Arc<MonitoredStorageMetrics>,
190 ) -> Self {
191 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193 }
194
195 pub fn hummock(
196 state_store: HummockStorage,
197 storage_metrics: Arc<MonitoredStorageMetrics>,
198 ) -> Self {
199 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201 }
202
203 pub fn sled(
204 state_store: SledStateStore,
205 storage_metrics: Arc<MonitoredStorageMetrics>,
206 ) -> Self {
207 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208 }
209
210 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212 }
213
214 pub fn for_test() -> Self {
215 Self::in_memory(
216 MemoryStateStore::new(),
217 Arc::new(MonitoredStorageMetrics::unused()),
218 )
219 }
220
221 pub fn as_hummock(&self) -> Option<&HummockStorage> {
222 match self {
223 StateStoreImpl::HummockStateStore(hummock) => {
224 Some(inner(hummock).as_hummock().expect("should be hummock"))
225 }
226 _ => None,
227 }
228 }
229}
230
231impl Debug for StateStoreImpl {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 match self {
234 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237 }
238 }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243 ($impl:expr, $store:ident, $body:tt) => {{
244 use $crate::store_impl::StateStoreImpl;
245
246 match $impl {
247 StateStoreImpl::MemoryStateStore($store) => {
248 #[cfg(debug_assertions)]
251 {
252 $body
253 }
254 #[cfg(not(debug_assertions))]
255 {
256 let _store = $store;
257 unimplemented!("memory state store should never be used in release mode");
258 }
259 }
260
261 StateStoreImpl::SledStateStore($store) => {
262 #[cfg(debug_assertions)]
265 {
266 $body
267 }
268 #[cfg(not(debug_assertions))]
269 {
270 let _store = $store;
271 unimplemented!("sled state store should never be used in release mode");
272 }
273 }
274
275 StateStoreImpl::HummockStateStore($store) => $body,
276 }
277 }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282 use std::fmt::Debug;
283 use std::future::Future;
284 use std::marker::PhantomData;
285 use std::ops::Deref;
286 use std::sync::Arc;
287
288 use bytes::Bytes;
289 use risingwave_common::array::VectorRef;
290 use risingwave_common::bitmap::Bitmap;
291 use risingwave_common::hash::VirtualNode;
292 use risingwave_hummock_sdk::HummockReadEpoch;
293 use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
294 use tracing::log::warn;
295
296 use crate::error::StorageResult;
297 use crate::hummock::HummockStorage;
298 use crate::store::*;
299 use crate::store_impl::AsHummock;
300
301 #[expect(dead_code)]
302 fn assert_result_eq<Item: PartialEq + Debug, E>(
303 first: &std::result::Result<Item, E>,
304 second: &std::result::Result<Item, E>,
305 ) {
306 match (first, second) {
307 (Ok(first), Ok(second)) => {
308 if first != second {
309 warn!("result different: {:?} {:?}", first, second);
310 }
311 assert_eq!(first, second);
312 }
313 (Err(_), Err(_)) => {}
314 _ => {
315 warn!("one success and one failed");
316 panic!("result not equal");
317 }
318 }
319 }
320
321 #[derive(Clone)]
322 pub struct VerifyStateStore<A, E, T = ()> {
323 pub actual: A,
324 pub expected: Option<E>,
325 pub _phantom: PhantomData<T>,
326 }
327
328 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
329 fn as_hummock(&self) -> Option<&HummockStorage> {
330 self.actual.as_hummock()
331 }
332 }
333
334 impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
335 async fn on_key_value<'a, O: Send + 'a>(
336 &'a self,
337 key: TableKey<Bytes>,
338 read_options: ReadOptions,
339 on_key_value_fn: impl KeyValueFn<'a, O>,
340 ) -> StorageResult<Option<O>> {
341 let actual: Option<(FullKey<Bytes>, Bytes)> = self
342 .actual
343 .on_key_value(key.clone(), read_options.clone(), |key, value| {
344 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345 })
346 .await?;
347 if let Some(expected) = &self.expected {
348 let expected: Option<(FullKey<Bytes>, Bytes)> = expected
349 .on_key_value(key, read_options, |key, value| {
350 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
351 })
352 .await?;
353 assert_eq!(
354 actual
355 .as_ref()
356 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
357 expected
358 .as_ref()
359 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
360 );
361 }
362
363 actual
364 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
365 .transpose()
366 }
367 }
368
369 impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
370 for VerifyStateStore<A, E>
371 {
372 fn nearest<'a, O: Send + 'a>(
373 &'a self,
374 vec: VectorRef<'a>,
375 options: VectorNearestOptions,
376 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
377 ) -> impl StorageFuture<'a, Vec<O>> {
378 self.actual.nearest(vec, options, on_nearest_item_fn)
379 }
380 }
381
382 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
383 type Iter = impl StateStoreReadIter;
384 type RevIter = impl StateStoreReadIter;
385
386 #[expect(clippy::manual_async_fn)]
389 fn iter(
390 &self,
391 key_range: TableKeyRange,
392
393 read_options: ReadOptions,
394 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
395 async move {
396 let actual = self
397 .actual
398 .iter(key_range.clone(), read_options.clone())
399 .await?;
400 let expected = if let Some(expected) = &self.expected {
401 Some(expected.iter(key_range, read_options).await?)
402 } else {
403 None
404 };
405
406 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
407 }
408 }
409
410 #[expect(clippy::manual_async_fn)]
411 fn rev_iter(
412 &self,
413 key_range: TableKeyRange,
414
415 read_options: ReadOptions,
416 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
417 async move {
418 let actual = self
419 .actual
420 .rev_iter(key_range.clone(), read_options.clone())
421 .await?;
422 let expected = if let Some(expected) = &self.expected {
423 Some(expected.rev_iter(key_range, read_options).await?)
424 } else {
425 None
426 };
427
428 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
429 }
430 }
431 }
432
433 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
434 type ChangeLogIter = impl StateStoreReadChangeLogIter;
435
436 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
437 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
438 if let Some(expected) = &self.expected {
439 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
440 }
441 Ok(actual)
442 }
443
444 async fn iter_log(
445 &self,
446 epoch_range: (u64, u64),
447 key_range: TableKeyRange,
448 options: ReadLogOptions,
449 ) -> StorageResult<Self::ChangeLogIter> {
450 let actual = self
451 .actual
452 .iter_log(epoch_range, key_range.clone(), options.clone())
453 .await?;
454 let expected = if let Some(expected) = &self.expected {
455 Some(expected.iter_log(epoch_range, key_range, options).await?)
456 } else {
457 None
458 };
459
460 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
461 }
462 }
463
464 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
465 for VerifyStateStore<A, E, T>
466 where
467 for<'a> T::ItemRef<'a>: PartialEq + Debug,
468 {
469 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
470 let actual = self.actual.try_next().await?;
471 if let Some(expected) = self.expected.as_mut() {
472 let expected = expected.try_next().await?;
473 assert_eq!(actual, expected);
474 }
475 Ok(actual)
476 }
477 }
478
479 fn verify_iter<T: IterItem>(
480 actual: impl StateStoreIter<T>,
481 expected: Option<impl StateStoreIter<T>>,
482 ) -> impl StateStoreIter<T>
483 where
484 for<'a> T::ItemRef<'a>: PartialEq + Debug,
485 {
486 VerifyStateStore {
487 actual,
488 expected,
489 _phantom: PhantomData::<T>,
490 }
491 }
492
493 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
494 type FlushedSnapshotReader =
495 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
496
497 type Iter<'a> = impl StateStoreIter + 'a;
498 type RevIter<'a> = impl StateStoreIter + 'a;
499
500 #[expect(clippy::manual_async_fn)]
501 fn iter(
502 &self,
503 key_range: TableKeyRange,
504 read_options: ReadOptions,
505 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
506 async move {
507 let actual = self
508 .actual
509 .iter(key_range.clone(), read_options.clone())
510 .await?;
511 let expected = if let Some(expected) = &self.expected {
512 Some(expected.iter(key_range, read_options).await?)
513 } else {
514 None
515 };
516
517 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
518 }
519 }
520
521 #[expect(clippy::manual_async_fn)]
522 fn rev_iter(
523 &self,
524 key_range: TableKeyRange,
525 read_options: ReadOptions,
526 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
527 async move {
528 let actual = self
529 .actual
530 .rev_iter(key_range.clone(), read_options.clone())
531 .await?;
532 let expected = if let Some(expected) = &self.expected {
533 Some(expected.rev_iter(key_range, read_options).await?)
534 } else {
535 None
536 };
537
538 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
539 }
540 }
541
542 fn insert(
543 &mut self,
544 key: TableKey<Bytes>,
545 new_val: Bytes,
546 old_val: Option<Bytes>,
547 ) -> StorageResult<()> {
548 if let Some(expected) = &mut self.expected {
549 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
550 }
551 self.actual.insert(key, new_val, old_val)?;
552
553 Ok(())
554 }
555
556 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
557 if let Some(expected) = &mut self.expected {
558 expected.delete(key.clone(), old_val.clone())?;
559 }
560 self.actual.delete(key, old_val)?;
561 Ok(())
562 }
563
564 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
565 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
566 if let Some(expected) = &mut self.expected {
567 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
568 }
569 Ok(ret)
570 }
571
572 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
573 let ret = self.actual.get_table_watermark(vnode);
574 if let Some(expected) = &self.expected {
575 assert_eq!(ret, expected.get_table_watermark(vnode));
576 }
577 ret
578 }
579
580 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
581 VerifyStateStore {
582 actual: self.actual.new_flushed_snapshot_reader(),
583 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
584 _phantom: Default::default(),
585 }
586 }
587 }
588
589 impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
590 for VerifyStateStore<A, E>
591 {
592 async fn flush(&mut self) -> StorageResult<usize> {
593 if let Some(expected) = &mut self.expected {
594 expected.flush().await?;
595 }
596 self.actual.flush().await
597 }
598
599 async fn try_flush(&mut self) -> StorageResult<()> {
600 if let Some(expected) = &mut self.expected {
601 expected.try_flush().await?;
602 }
603 self.actual.try_flush().await
604 }
605
606 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
607 self.actual.init(options.clone()).await?;
608 if let Some(expected) = &mut self.expected {
609 expected.init(options).await?;
610 }
611 Ok(())
612 }
613
614 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
615 if let Some(expected) = &mut self.expected {
616 expected.seal_current_epoch(next_epoch, opts.clone());
617 }
618 self.actual.seal_current_epoch(next_epoch, opts);
619 }
620 }
621
622 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
623 type Local = VerifyStateStore<A::Local, E::Local>;
624 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
625 type VectorWriter = A::VectorWriter;
626
627 fn try_wait_epoch(
628 &self,
629 epoch: HummockReadEpoch,
630 options: TryWaitEpochOptions,
631 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
632 self.actual.try_wait_epoch(epoch, options)
633 }
634
635 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
636 let expected = if let Some(expected) = &self.expected {
637 Some(expected.new_local(option.clone()).await)
638 } else {
639 None
640 };
641 VerifyStateStore {
642 actual: self.actual.new_local(option).await,
643 expected,
644 _phantom: PhantomData::<()>,
645 }
646 }
647
648 async fn new_read_snapshot(
649 &self,
650 epoch: HummockReadEpoch,
651 options: NewReadSnapshotOptions,
652 ) -> StorageResult<Self::ReadSnapshot> {
653 let expected = if let Some(expected) = &self.expected {
654 Some(expected.new_read_snapshot(epoch, options).await?)
655 } else {
656 None
657 };
658 Ok(VerifyStateStore {
659 actual: self.actual.new_read_snapshot(epoch, options).await?,
660 expected,
661 _phantom: PhantomData::<()>,
662 })
663 }
664
665 fn new_vector_writer(
666 &self,
667 options: NewVectorWriterOptions,
668 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
669 self.actual.new_vector_writer(options)
670 }
671 }
672
673 impl<A, E> Deref for VerifyStateStore<A, E> {
674 type Target = A;
675
676 fn deref(&self) -> &Self::Target {
677 &self.actual
678 }
679 }
680}
681
682impl StateStoreImpl {
683 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
684 #[allow(clippy::too_many_arguments)]
685 #[expect(clippy::borrowed_box)]
686 pub async fn new(
687 s: &str,
688 opts: Arc<StorageOpts>,
689 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
690 state_store_metrics: Arc<HummockStateStoreMetrics>,
691 object_store_metrics: Arc<ObjectStoreMetrics>,
692 storage_metrics: Arc<MonitoredStorageMetrics>,
693 compactor_metrics: Arc<CompactorMetrics>,
694 await_tree_config: Option<await_tree::Config>,
695 use_new_object_prefix_strategy: bool,
696 ) -> StorageResult<Self> {
697 const KB: usize = 1 << 10;
698 const MB: usize = 1 << 20;
699
700 let meta_cache = {
701 let mut builder = HybridCacheBuilder::new()
702 .with_name("foyer.meta")
703 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
704 .memory(opts.meta_cache_capacity_mb * MB)
705 .with_shards(opts.meta_cache_shard_num)
706 .with_eviction_config(opts.meta_cache_eviction_config.clone())
707 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
708 u64::BITS as usize / 8 + value.estimate_size()
709 })
710 .storage();
711
712 if !opts.meta_file_cache_dir.is_empty() {
713 if let Err(e) = Feature::ElasticDiskCache.check_available() {
714 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
715 } else {
716 let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
717 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
718 .with_throttle(opts.meta_file_cache_throttle.clone())
719 .build()
720 .map_err(HummockError::foyer_io_error)?;
721 let engine_builder = BlockEngineBuilder::new(device)
722 .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
723 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
724 .with_flushers(opts.meta_file_cache_flushers)
725 .with_reclaimers(opts.meta_file_cache_reclaimers)
726 .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
728 opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
729 )
730 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
731 .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
732 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
733 opts.meta_file_cache_fifo_probation_ratio,
734 ))]);
735 builder = builder.with_engine_config(engine_builder);
736 }
737 }
738
739 builder.build().await.map_err(HummockError::foyer_error)?
740 };
741
742 let block_cache = {
743 let mut builder = HybridCacheBuilder::new()
744 .with_name("foyer.data")
745 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
746 .with_event_listener(Arc::new(BlockCacheEventListener::new(
747 state_store_metrics.clone(),
748 )))
749 .memory(opts.block_cache_capacity_mb * MB)
750 .with_shards(opts.block_cache_shard_num)
751 .with_eviction_config(opts.block_cache_eviction_config.clone())
752 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
753 u64::BITS as usize * 2 / 8 + value.raw().len()
755 })
756 .storage();
757
758 if !opts.data_file_cache_dir.is_empty() {
759 if let Err(e) = Feature::ElasticDiskCache.check_available() {
760 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
761 } else {
762 let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
763 .with_capacity(opts.data_file_cache_capacity_mb * MB)
764 .with_throttle(opts.data_file_cache_throttle.clone())
765 .build()
766 .map_err(HummockError::foyer_io_error)?;
767 let engine_builder = BlockEngineBuilder::new(device)
768 .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
769 .with_indexer_shards(opts.data_file_cache_indexer_shards)
770 .with_flushers(opts.data_file_cache_flushers)
771 .with_reclaimers(opts.data_file_cache_reclaimers)
772 .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
774 opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
775 )
776 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
777 .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
778 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
779 opts.data_file_cache_fifo_probation_ratio,
780 ))]);
781 builder = builder.with_engine_config(engine_builder);
782 }
783 }
784
785 builder.build().await.map_err(HummockError::foyer_error)?
786 };
787
788 let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
789 .with_shards(opts.vector_meta_cache_shard_num)
790 .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
791 .build();
792
793 let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
794 .with_shards(opts.vector_block_cache_shard_num)
795 .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
796 .build();
797
798 let recent_filter = if opts.data_file_cache_dir.is_empty() {
799 Arc::new(NoneRecentFilter::default().into())
800 } else if opts.cache_refill_recent_filter_shards == 1 {
801 Arc::new(
802 SimpleRecentFilter::new(
803 opts.cache_refill_recent_filter_layers,
804 Duration::from_millis(
805 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
806 ),
807 )
808 .into(),
809 )
810 } else if opts.cache_refill_skip_recent_filter {
811 Arc::new(AllRecentFilter::default().into())
812 } else {
813 Arc::new(
814 ShardedRecentFilter::new(
815 opts.cache_refill_recent_filter_layers,
816 Duration::from_millis(
817 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
818 ),
819 opts.cache_refill_recent_filter_shards,
820 )
821 .into(),
822 )
823 };
824
825 let store = match s {
826 hummock if hummock.starts_with("hummock+") => {
827 let object_store = build_remote_object_store(
828 hummock.strip_prefix("hummock+").unwrap(),
829 object_store_metrics.clone(),
830 "Hummock",
831 Arc::new(opts.object_store_config.clone()),
832 )
833 .await;
834
835 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
836 store: Arc::new(object_store),
837 path: opts.data_directory.clone(),
838 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
839 max_prefetch_block_number: opts.max_prefetch_block_number,
840 recent_filter,
841 state_store_metrics: state_store_metrics.clone(),
842 use_new_object_prefix_strategy,
843
844 meta_cache,
845 block_cache,
846 vector_meta_cache,
847 vector_block_cache,
848 }));
849 let notification_client =
850 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
851 let compaction_catalog_manager_ref =
852 Arc::new(CompactionCatalogManager::new(Box::new(
853 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
854 )));
855
856 let inner = HummockStorage::new(
857 opts.clone(),
858 sstable_store,
859 hummock_meta_client.clone(),
860 notification_client,
861 compaction_catalog_manager_ref,
862 state_store_metrics.clone(),
863 compactor_metrics.clone(),
864 await_tree_config,
865 )
866 .await?;
867
868 StateStoreImpl::hummock(inner, storage_metrics)
869 }
870
871 "in_memory" | "in-memory" => {
872 tracing::warn!(
873 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
874 );
875 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
876 }
877
878 sled if sled.starts_with("sled://") => {
879 tracing::warn!(
880 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
881 );
882 let path = sled.strip_prefix("sled://").unwrap();
883 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
884 }
885
886 other => unimplemented!("{} state store is not supported", other),
887 };
888
889 Ok(store)
890 }
891}
892
893pub trait AsHummock: Send + Sync {
894 fn as_hummock(&self) -> Option<&HummockStorage>;
895
896 fn sync(
897 &self,
898 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
899 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
900 async move {
901 if let Some(hummock) = self.as_hummock() {
902 hummock.sync(sync_table_epochs).await
903 } else {
904 Ok(SyncResult::default())
905 }
906 }
907 .boxed()
908 }
909}
910
911impl AsHummock for HummockStorage {
912 fn as_hummock(&self) -> Option<&HummockStorage> {
913 Some(self)
914 }
915}
916
917impl AsHummock for MemoryStateStore {
918 fn as_hummock(&self) -> Option<&HummockStorage> {
919 None
920 }
921}
922
923impl AsHummock for SledStateStore {
924 fn as_hummock(&self) -> Option<&HummockStorage> {
925 None
926 }
927}
928
929#[cfg(debug_assertions)]
930mod dyn_state_store {
931 use std::future::Future;
932 use std::ops::DerefMut;
933 use std::sync::Arc;
934
935 use bytes::Bytes;
936 use risingwave_common::array::VectorRef;
937 use risingwave_common::bitmap::Bitmap;
938 use risingwave_common::hash::VirtualNode;
939 use risingwave_hummock_sdk::HummockReadEpoch;
940 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
941
942 use crate::error::StorageResult;
943 use crate::hummock::HummockStorage;
944 use crate::store::*;
945 use crate::store_impl::AsHummock;
946 use crate::vector::VectorDistance;
947
948 #[async_trait::async_trait]
949 pub trait DynStateStoreIter<T: IterItem>: Send {
950 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
951 }
952
953 #[async_trait::async_trait]
954 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
955 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
956 self.try_next().await
957 }
958 }
959
960 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
961 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
962 fn try_next(
963 &mut self,
964 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
965 self.deref_mut().try_next()
966 }
967 }
968
969 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
972 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
973
974 #[async_trait::async_trait]
975 pub trait DynStateStoreGet: StaticSendSync {
976 async fn get_keyed_row(
977 &self,
978 key: TableKey<Bytes>,
979 read_options: ReadOptions,
980 ) -> StorageResult<Option<StateStoreKeyedRow>>;
981 }
982
983 #[async_trait::async_trait]
984 pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
985 async fn iter(
986 &self,
987 key_range: TableKeyRange,
988
989 read_options: ReadOptions,
990 ) -> StorageResult<BoxStateStoreReadIter>;
991
992 async fn rev_iter(
993 &self,
994 key_range: TableKeyRange,
995
996 read_options: ReadOptions,
997 ) -> StorageResult<BoxStateStoreReadIter>;
998 }
999
1000 #[async_trait::async_trait]
1001 pub trait DynStateStoreReadLog: StaticSendSync {
1002 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1003 async fn iter_log(
1004 &self,
1005 epoch_range: (u64, u64),
1006 key_range: TableKeyRange,
1007 options: ReadLogOptions,
1008 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1009 }
1010
1011 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1012
1013 #[async_trait::async_trait]
1014 impl<S: StateStoreGet> DynStateStoreGet for S {
1015 async fn get_keyed_row(
1016 &self,
1017 key: TableKey<Bytes>,
1018 read_options: ReadOptions,
1019 ) -> StorageResult<Option<StateStoreKeyedRow>> {
1020 self.on_key_value(key, read_options, move |key, value| {
1021 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1022 })
1023 .await
1024 }
1025 }
1026
1027 #[async_trait::async_trait]
1028 impl<S: StateStoreRead> DynStateStoreRead for S {
1029 async fn iter(
1030 &self,
1031 key_range: TableKeyRange,
1032
1033 read_options: ReadOptions,
1034 ) -> StorageResult<BoxStateStoreReadIter> {
1035 Ok(Box::new(self.iter(key_range, read_options).await?))
1036 }
1037
1038 async fn rev_iter(
1039 &self,
1040 key_range: TableKeyRange,
1041
1042 read_options: ReadOptions,
1043 ) -> StorageResult<BoxStateStoreReadIter> {
1044 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1045 }
1046 }
1047
1048 #[async_trait::async_trait]
1049 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1050 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1051 self.next_epoch(epoch, options).await
1052 }
1053
1054 async fn iter_log(
1055 &self,
1056 epoch_range: (u64, u64),
1057 key_range: TableKeyRange,
1058 options: ReadLogOptions,
1059 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1060 Ok(Box::new(
1061 self.iter_log(epoch_range, key_range, options).await?,
1062 ))
1063 }
1064 }
1065
1066 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1068 #[async_trait::async_trait]
1069 pub trait DynLocalStateStore:
1070 DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1071 {
1072 async fn iter(
1073 &self,
1074 key_range: TableKeyRange,
1075 read_options: ReadOptions,
1076 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1077
1078 async fn rev_iter(
1079 &self,
1080 key_range: TableKeyRange,
1081 read_options: ReadOptions,
1082 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1083
1084 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1085
1086 fn insert(
1087 &mut self,
1088 key: TableKey<Bytes>,
1089 new_val: Bytes,
1090 old_val: Option<Bytes>,
1091 ) -> StorageResult<()>;
1092
1093 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1094
1095 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1096
1097 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1098 }
1099
1100 #[async_trait::async_trait]
1101 pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1102 async fn flush(&mut self) -> StorageResult<usize>;
1103
1104 async fn try_flush(&mut self) -> StorageResult<()>;
1105
1106 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1107
1108 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1109 }
1110
1111 #[async_trait::async_trait]
1112 impl<S: LocalStateStore> DynLocalStateStore for S {
1113 async fn iter(
1114 &self,
1115 key_range: TableKeyRange,
1116 read_options: ReadOptions,
1117 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1118 Ok(Box::new(self.iter(key_range, read_options).await?))
1119 }
1120
1121 async fn rev_iter(
1122 &self,
1123 key_range: TableKeyRange,
1124 read_options: ReadOptions,
1125 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1126 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1127 }
1128
1129 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1130 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1131 }
1132
1133 fn insert(
1134 &mut self,
1135 key: TableKey<Bytes>,
1136 new_val: Bytes,
1137 old_val: Option<Bytes>,
1138 ) -> StorageResult<()> {
1139 self.insert(key, new_val, old_val)
1140 }
1141
1142 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1143 self.delete(key, old_val)
1144 }
1145
1146 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1147 self.update_vnode_bitmap(vnodes).await
1148 }
1149
1150 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1151 self.get_table_watermark(vnode)
1152 }
1153 }
1154
1155 #[async_trait::async_trait]
1156 impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1157 async fn flush(&mut self) -> StorageResult<usize> {
1158 self.flush().await
1159 }
1160
1161 async fn try_flush(&mut self) -> StorageResult<()> {
1162 self.try_flush().await
1163 }
1164
1165 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1166 self.init(options).await
1167 }
1168
1169 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1170 self.seal_current_epoch(next_epoch, opts)
1171 }
1172 }
1173
1174 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1175
1176 impl LocalStateStore for BoxDynLocalStateStore {
1177 type FlushedSnapshotReader = StateStoreReadDynRef;
1178 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1179 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1180
1181 fn iter(
1182 &self,
1183 key_range: TableKeyRange,
1184 read_options: ReadOptions,
1185 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1186 (*self.0).iter(key_range, read_options)
1187 }
1188
1189 fn rev_iter(
1190 &self,
1191 key_range: TableKeyRange,
1192 read_options: ReadOptions,
1193 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1194 (*self.0).rev_iter(key_range, read_options)
1195 }
1196
1197 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1198 (*self.0).new_flushed_snapshot_reader()
1199 }
1200
1201 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1202 (*self.0).get_table_watermark(vnode)
1203 }
1204
1205 fn insert(
1206 &mut self,
1207 key: TableKey<Bytes>,
1208 new_val: Bytes,
1209 old_val: Option<Bytes>,
1210 ) -> StorageResult<()> {
1211 (*self.0).insert(key, new_val, old_val)
1212 }
1213
1214 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1215 (*self.0).delete(key, old_val)
1216 }
1217
1218 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1219 (*self.0).update_vnode_bitmap(vnodes).await
1220 }
1221 }
1222
1223 impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1224 where
1225 StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1226 {
1227 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1228 self.as_mut().flush()
1229 }
1230
1231 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1232 self.as_mut().try_flush()
1233 }
1234
1235 fn init(
1236 &mut self,
1237 options: InitOptions,
1238 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1239 self.as_mut().init(options)
1240 }
1241
1242 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1243 self.as_mut().seal_current_epoch(next_epoch, opts)
1244 }
1245 }
1246
1247 #[async_trait::async_trait]
1248 pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1249 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
1250 }
1251
1252 #[async_trait::async_trait]
1253 impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1254 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1255 self.insert(vec, info)
1256 }
1257 }
1258
1259 pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1260
1261 impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1262 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1263 self.0.insert(vec, info)
1264 }
1265 }
1266
1267 #[async_trait::async_trait]
1270 pub trait DynStateStoreReadVector: StaticSendSync {
1271 async fn nearest(
1272 &self,
1273 vec: VectorRef<'_>,
1274 options: VectorNearestOptions,
1275 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1276 }
1277
1278 #[async_trait::async_trait]
1279 impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1280 async fn nearest(
1281 &self,
1282 vec: VectorRef<'_>,
1283 options: VectorNearestOptions,
1284 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1285 use risingwave_common::types::ScalarRef;
1286 self.nearest(vec, options, |vec, distance, info| {
1287 (
1288 vec.to_owned_scalar(),
1289 distance,
1290 Bytes::copy_from_slice(info),
1291 )
1292 })
1293 .await
1294 }
1295 }
1296
1297 impl<P> StateStoreReadVector for StateStorePointer<P>
1298 where
1299 StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1300 {
1301 async fn nearest<'a, O: Send + 'a>(
1302 &'a self,
1303 vec: VectorRef<'a>,
1304 options: VectorNearestOptions,
1305 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1306 ) -> StorageResult<Vec<O>> {
1307 let output = self.as_ref().nearest(vec, options).await?;
1308 Ok(output
1309 .into_iter()
1310 .map(|(vec, distance, info)| {
1311 on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1312 })
1313 .collect())
1314 }
1315 }
1316
1317 pub trait DynStateStoreReadSnapshot:
1318 DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1319 {
1320 }
1321
1322 impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1323 for S
1324 {
1325 }
1326
1327 pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1328 #[async_trait::async_trait]
1329 pub trait DynStateStoreExt: StaticSendSync {
1330 async fn try_wait_epoch(
1331 &self,
1332 epoch: HummockReadEpoch,
1333 options: TryWaitEpochOptions,
1334 ) -> StorageResult<()>;
1335
1336 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1337 async fn new_read_snapshot(
1338 &self,
1339 epoch: HummockReadEpoch,
1340 options: NewReadSnapshotOptions,
1341 ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1342 async fn new_vector_writer(
1343 &self,
1344 options: NewVectorWriterOptions,
1345 ) -> BoxDynStateStoreWriteVector;
1346 }
1347
1348 #[async_trait::async_trait]
1349 impl<S: StateStore> DynStateStoreExt for S {
1350 async fn try_wait_epoch(
1351 &self,
1352 epoch: HummockReadEpoch,
1353 options: TryWaitEpochOptions,
1354 ) -> StorageResult<()> {
1355 self.try_wait_epoch(epoch, options).await
1356 }
1357
1358 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1359 StateStorePointer(Box::new(self.new_local(option).await))
1360 }
1361
1362 async fn new_read_snapshot(
1363 &self,
1364 epoch: HummockReadEpoch,
1365 options: NewReadSnapshotOptions,
1366 ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1367 Ok(StateStorePointer(Arc::new(
1368 self.new_read_snapshot(epoch, options).await?,
1369 )))
1370 }
1371
1372 async fn new_vector_writer(
1373 &self,
1374 options: NewVectorWriterOptions,
1375 ) -> BoxDynStateStoreWriteVector {
1376 StateStorePointer(Box::new(self.new_vector_writer(options).await))
1377 }
1378 }
1379
1380 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1381
1382 macro_rules! state_store_pointer_dyn_as_ref {
1383 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1384 impl AsRef<dyn $target_dyn_trait>
1385 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1386 {
1387 fn as_ref(&self) -> &dyn $target_dyn_trait {
1388 (&*self.0) as _
1389 }
1390 }
1391 };
1392 }
1393
1394 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1395 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1396 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1397 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1398 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1399 state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1400
1401 macro_rules! state_store_pointer_dyn_as_mut {
1402 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1403 impl AsMut<dyn $target_dyn_trait>
1404 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1405 {
1406 fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1407 (&mut *self.0) as _
1408 }
1409 }
1410 };
1411 }
1412
1413 state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1414 state_store_pointer_dyn_as_mut!(
1415 Box<dyn DynStateStoreWriteVector>,
1416 DynStateStoreWriteEpochControl
1417 );
1418
1419 #[derive(Clone)]
1420 pub struct StateStorePointer<P>(pub(crate) P);
1421
1422 impl<P> StateStoreGet for StateStorePointer<P>
1423 where
1424 StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1425 {
1426 async fn on_key_value<'a, O: Send + 'a>(
1427 &'a self,
1428 key: TableKey<Bytes>,
1429 read_options: ReadOptions,
1430 on_key_value_fn: impl KeyValueFn<'a, O>,
1431 ) -> StorageResult<Option<O>> {
1432 let option = self.as_ref().get_keyed_row(key, read_options).await?;
1433 option
1434 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1435 .transpose()
1436 }
1437 }
1438
1439 impl<P> StateStoreRead for StateStorePointer<P>
1440 where
1441 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1442 {
1443 type Iter = BoxStateStoreReadIter;
1444 type RevIter = BoxStateStoreReadIter;
1445
1446 fn iter(
1447 &self,
1448 key_range: TableKeyRange,
1449
1450 read_options: ReadOptions,
1451 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1452 self.as_ref().iter(key_range, read_options)
1453 }
1454
1455 fn rev_iter(
1456 &self,
1457 key_range: TableKeyRange,
1458
1459 read_options: ReadOptions,
1460 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1461 self.as_ref().rev_iter(key_range, read_options)
1462 }
1463 }
1464
1465 impl StateStoreReadLog for StateStoreDynRef {
1466 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1467
1468 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1469 (*self.0).next_epoch(epoch, options).await
1470 }
1471
1472 fn iter_log(
1473 &self,
1474 epoch_range: (u64, u64),
1475 key_range: TableKeyRange,
1476 options: ReadLogOptions,
1477 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1478 (*self.0).iter_log(epoch_range, key_range, options)
1479 }
1480 }
1481
1482 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1483
1484 impl AsHummock for StateStoreDynRef {
1485 fn as_hummock(&self) -> Option<&HummockStorage> {
1486 (*self.0).as_hummock()
1487 }
1488 }
1489
1490 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1491
1492 impl StateStore for StateStoreDynRef {
1493 type Local = BoxDynLocalStateStore;
1494 type ReadSnapshot = StateStoreReadSnapshotDynRef;
1495 type VectorWriter = BoxDynStateStoreWriteVector;
1496
1497 fn try_wait_epoch(
1498 &self,
1499 epoch: HummockReadEpoch,
1500 options: TryWaitEpochOptions,
1501 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1502 (*self.0).try_wait_epoch(epoch, options)
1503 }
1504
1505 fn new_local(
1506 &self,
1507 option: NewLocalOptions,
1508 ) -> impl Future<Output = Self::Local> + Send + '_ {
1509 (*self.0).new_local(option)
1510 }
1511
1512 async fn new_read_snapshot(
1513 &self,
1514 epoch: HummockReadEpoch,
1515 options: NewReadSnapshotOptions,
1516 ) -> StorageResult<Self::ReadSnapshot> {
1517 (*self.0).new_read_snapshot(epoch, options).await
1518 }
1519
1520 fn new_vector_writer(
1521 &self,
1522 options: NewVectorWriterOptions,
1523 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1524 (*self.0).new_vector_writer(options)
1525 }
1526 }
1527}