1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22 CacheBuilder, DirectFsDeviceOptions, Engine, FifoPicker, HybridCacheBuilder, LargeEngineOptions,
23};
24use futures::FutureExt;
25use futures::future::BoxFuture;
26use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
27use risingwave_common::catalog::TableId;
28use risingwave_common::license::Feature;
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common_service::RpcNotificationClient;
31use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
32use risingwave_object_store::object::build_remote_object_store;
33use thiserror_ext::AsReport;
34
35use crate::StateStore;
36use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
37use crate::error::StorageResult;
38use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
39use crate::hummock::{
40 Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
41 SstableBlockIndex, SstableStore, SstableStoreConfig,
42};
43use crate::memory::MemoryStateStore;
44use crate::memory::sled::SledStateStore;
45use crate::monitor::{
46 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
47 ObjectStoreMetrics,
48};
49use crate::opts::StorageOpts;
50
51static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
52 Box::new(PrometheusMetricsRegistry::new(
53 GLOBAL_METRICS_REGISTRY.clone(),
54 ))
55});
56
57mod opaque_type {
58 use super::*;
59
60 pub type HummockStorageType = impl StateStore + AsHummock;
61 pub type MemoryStateStoreType = impl StateStore + AsHummock;
62 pub type SledStateStoreType = impl StateStore + AsHummock;
63
64 #[define_opaque(MemoryStateStoreType)]
65 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
66 may_dynamic_dispatch(state_store)
67 }
68
69 #[define_opaque(HummockStorageType)]
70 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
71 may_dynamic_dispatch(may_verify(state_store))
72 }
73
74 #[define_opaque(SledStateStoreType)]
75 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
76 may_dynamic_dispatch(state_store)
77 }
78}
79pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
80use opaque_type::{hummock, in_memory, sled};
81
82#[cfg(feature = "hm-trace")]
83type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
84
85#[cfg(not(feature = "hm-trace"))]
86type Monitored<S> = MonitoredStateStore<S>;
87
88fn monitored<S: StateStore>(
89 state_store: S,
90 storage_metrics: Arc<MonitoredStorageMetrics>,
91) -> Monitored<S> {
92 let inner = {
93 #[cfg(feature = "hm-trace")]
94 {
95 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
96 }
97 #[cfg(not(feature = "hm-trace"))]
98 {
99 state_store
100 }
101 };
102 inner.monitored(storage_metrics)
103}
104
105fn inner<S>(state_store: &Monitored<S>) -> &S {
106 let inner = state_store.inner();
107 {
108 #[cfg(feature = "hm-trace")]
109 {
110 inner.inner()
111 }
112 #[cfg(not(feature = "hm-trace"))]
113 {
114 inner
115 }
116 }
117}
118
119#[derive(Clone, EnumAsInner)]
121#[allow(clippy::enum_variant_names)]
122pub enum StateStoreImpl {
123 HummockStateStore(Monitored<HummockStorageType>),
132 MemoryStateStore(Monitored<MemoryStateStoreType>),
137 SledStateStore(Monitored<SledStateStoreType>),
138}
139
140fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
141 #[cfg(not(debug_assertions))]
142 {
143 state_store
144 }
145 #[cfg(debug_assertions)]
146 {
147 use crate::store_impl::dyn_state_store::StateStorePointer;
148 StateStorePointer(Arc::new(state_store) as _)
149 }
150}
151
152fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
153 #[cfg(not(debug_assertions))]
154 {
155 state_store
156 }
157 #[cfg(debug_assertions)]
158 {
159 use std::marker::PhantomData;
160
161 use risingwave_common::util::env_var::env_var_is_true;
162 use tracing::info;
163
164 use crate::store_impl::verify::VerifyStateStore;
165
166 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
167 info!("enable verify state store");
168 Some(SledStateStore::new_temp())
169 } else {
170 info!("verify state store is not enabled");
171 None
172 };
173 VerifyStateStore {
174 actual: state_store,
175 expected,
176 _phantom: PhantomData::<()>,
177 }
178 }
179}
180
181impl StateStoreImpl {
182 fn in_memory(
183 state_store: MemoryStateStore,
184 storage_metrics: Arc<MonitoredStorageMetrics>,
185 ) -> Self {
186 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
188 }
189
190 pub fn hummock(
191 state_store: HummockStorage,
192 storage_metrics: Arc<MonitoredStorageMetrics>,
193 ) -> Self {
194 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
196 }
197
198 pub fn sled(
199 state_store: SledStateStore,
200 storage_metrics: Arc<MonitoredStorageMetrics>,
201 ) -> Self {
202 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
203 }
204
205 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
206 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
207 }
208
209 pub fn for_test() -> Self {
210 Self::in_memory(
211 MemoryStateStore::new(),
212 Arc::new(MonitoredStorageMetrics::unused()),
213 )
214 }
215
216 pub fn as_hummock(&self) -> Option<&HummockStorage> {
217 match self {
218 StateStoreImpl::HummockStateStore(hummock) => {
219 Some(inner(hummock).as_hummock().expect("should be hummock"))
220 }
221 _ => None,
222 }
223 }
224}
225
226impl Debug for StateStoreImpl {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 match self {
229 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
230 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
231 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
232 }
233 }
234}
235
236#[macro_export]
237macro_rules! dispatch_state_store {
238 ($impl:expr, $store:ident, $body:tt) => {{
239 use $crate::store_impl::StateStoreImpl;
240
241 match $impl {
242 StateStoreImpl::MemoryStateStore($store) => {
243 #[cfg(debug_assertions)]
246 {
247 $body
248 }
249 #[cfg(not(debug_assertions))]
250 {
251 let _store = $store;
252 unimplemented!("memory state store should never be used in release mode");
253 }
254 }
255
256 StateStoreImpl::SledStateStore($store) => {
257 #[cfg(debug_assertions)]
260 {
261 $body
262 }
263 #[cfg(not(debug_assertions))]
264 {
265 let _store = $store;
266 unimplemented!("sled state store should never be used in release mode");
267 }
268 }
269
270 StateStoreImpl::HummockStateStore($store) => $body,
271 }
272 }};
273}
274
275#[cfg(any(debug_assertions, test, feature = "test"))]
276pub mod verify {
277 use std::fmt::Debug;
278 use std::future::Future;
279 use std::marker::PhantomData;
280 use std::ops::Deref;
281 use std::sync::Arc;
282
283 use bytes::Bytes;
284 use risingwave_common::bitmap::Bitmap;
285 use risingwave_common::hash::VirtualNode;
286 use risingwave_hummock_sdk::HummockReadEpoch;
287 use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
288 use tracing::log::warn;
289
290 use crate::error::StorageResult;
291 use crate::hummock::HummockStorage;
292 use crate::store::*;
293 use crate::store_impl::AsHummock;
294
295 #[expect(dead_code)]
296 fn assert_result_eq<Item: PartialEq + Debug, E>(
297 first: &std::result::Result<Item, E>,
298 second: &std::result::Result<Item, E>,
299 ) {
300 match (first, second) {
301 (Ok(first), Ok(second)) => {
302 if first != second {
303 warn!("result different: {:?} {:?}", first, second);
304 }
305 assert_eq!(first, second);
306 }
307 (Err(_), Err(_)) => {}
308 _ => {
309 warn!("one success and one failed");
310 panic!("result not equal");
311 }
312 }
313 }
314
315 #[derive(Clone)]
316 pub struct VerifyStateStore<A, E, T = ()> {
317 pub actual: A,
318 pub expected: Option<E>,
319 pub _phantom: PhantomData<T>,
320 }
321
322 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
323 fn as_hummock(&self) -> Option<&HummockStorage> {
324 self.actual.as_hummock()
325 }
326 }
327
328 impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
329 async fn on_key_value<O: Send + 'static>(
330 &self,
331 key: TableKey<Bytes>,
332 read_options: ReadOptions,
333 on_key_value_fn: impl KeyValueFn<O>,
334 ) -> StorageResult<Option<O>> {
335 let actual: Option<(FullKey<Bytes>, Bytes)> = self
336 .actual
337 .on_key_value(key.clone(), read_options.clone(), |key, value| {
338 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
339 })
340 .await?;
341 if let Some(expected) = &self.expected {
342 let expected: Option<(FullKey<Bytes>, Bytes)> = expected
343 .on_key_value(key, read_options, |key, value| {
344 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345 })
346 .await?;
347 assert_eq!(
348 actual
349 .as_ref()
350 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
351 expected
352 .as_ref()
353 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
354 );
355 }
356
357 actual
358 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
359 .transpose()
360 }
361 }
362
363 impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
364 for VerifyStateStore<A, E>
365 {
366 fn nearest<O: Send + 'static>(
367 &self,
368 vec: Vector,
369 options: VectorNearestOptions,
370 on_nearest_item_fn: impl OnNearestItemFn<O>,
371 ) -> impl StorageFuture<'_, Vec<O>> {
372 self.actual.nearest(vec, options, on_nearest_item_fn)
373 }
374 }
375
376 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
377 type Iter = impl StateStoreReadIter;
378 type RevIter = impl StateStoreReadIter;
379
380 #[expect(clippy::manual_async_fn)]
383 fn iter(
384 &self,
385 key_range: TableKeyRange,
386
387 read_options: ReadOptions,
388 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
389 async move {
390 let actual = self
391 .actual
392 .iter(key_range.clone(), read_options.clone())
393 .await?;
394 let expected = if let Some(expected) = &self.expected {
395 Some(expected.iter(key_range, read_options).await?)
396 } else {
397 None
398 };
399
400 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
401 }
402 }
403
404 #[expect(clippy::manual_async_fn)]
405 fn rev_iter(
406 &self,
407 key_range: TableKeyRange,
408
409 read_options: ReadOptions,
410 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
411 async move {
412 let actual = self
413 .actual
414 .rev_iter(key_range.clone(), read_options.clone())
415 .await?;
416 let expected = if let Some(expected) = &self.expected {
417 Some(expected.rev_iter(key_range, read_options).await?)
418 } else {
419 None
420 };
421
422 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
423 }
424 }
425 }
426
427 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
428 type ChangeLogIter = impl StateStoreReadChangeLogIter;
429
430 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
431 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
432 if let Some(expected) = &self.expected {
433 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
434 }
435 Ok(actual)
436 }
437
438 async fn iter_log(
439 &self,
440 epoch_range: (u64, u64),
441 key_range: TableKeyRange,
442 options: ReadLogOptions,
443 ) -> StorageResult<Self::ChangeLogIter> {
444 let actual = self
445 .actual
446 .iter_log(epoch_range, key_range.clone(), options.clone())
447 .await?;
448 let expected = if let Some(expected) = &self.expected {
449 Some(expected.iter_log(epoch_range, key_range, options).await?)
450 } else {
451 None
452 };
453
454 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
455 }
456 }
457
458 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
459 for VerifyStateStore<A, E, T>
460 where
461 for<'a> T::ItemRef<'a>: PartialEq + Debug,
462 {
463 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
464 let actual = self.actual.try_next().await?;
465 if let Some(expected) = self.expected.as_mut() {
466 let expected = expected.try_next().await?;
467 assert_eq!(actual, expected);
468 }
469 Ok(actual)
470 }
471 }
472
473 fn verify_iter<T: IterItem>(
474 actual: impl StateStoreIter<T>,
475 expected: Option<impl StateStoreIter<T>>,
476 ) -> impl StateStoreIter<T>
477 where
478 for<'a> T::ItemRef<'a>: PartialEq + Debug,
479 {
480 VerifyStateStore {
481 actual,
482 expected,
483 _phantom: PhantomData::<T>,
484 }
485 }
486
487 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
488 type FlushedSnapshotReader =
489 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
490
491 type Iter<'a> = impl StateStoreIter + 'a;
492 type RevIter<'a> = impl StateStoreIter + 'a;
493
494 #[expect(clippy::manual_async_fn)]
495 fn iter(
496 &self,
497 key_range: TableKeyRange,
498 read_options: ReadOptions,
499 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
500 async move {
501 let actual = self
502 .actual
503 .iter(key_range.clone(), read_options.clone())
504 .await?;
505 let expected = if let Some(expected) = &self.expected {
506 Some(expected.iter(key_range, read_options).await?)
507 } else {
508 None
509 };
510
511 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
512 }
513 }
514
515 #[expect(clippy::manual_async_fn)]
516 fn rev_iter(
517 &self,
518 key_range: TableKeyRange,
519 read_options: ReadOptions,
520 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
521 async move {
522 let actual = self
523 .actual
524 .rev_iter(key_range.clone(), read_options.clone())
525 .await?;
526 let expected = if let Some(expected) = &self.expected {
527 Some(expected.rev_iter(key_range, read_options).await?)
528 } else {
529 None
530 };
531
532 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
533 }
534 }
535
536 fn insert(
537 &mut self,
538 key: TableKey<Bytes>,
539 new_val: Bytes,
540 old_val: Option<Bytes>,
541 ) -> StorageResult<()> {
542 if let Some(expected) = &mut self.expected {
543 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
544 }
545 self.actual.insert(key, new_val, old_val)?;
546
547 Ok(())
548 }
549
550 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
551 if let Some(expected) = &mut self.expected {
552 expected.delete(key.clone(), old_val.clone())?;
553 }
554 self.actual.delete(key, old_val)?;
555 Ok(())
556 }
557
558 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
559 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
560 if let Some(expected) = &mut self.expected {
561 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
562 }
563 Ok(ret)
564 }
565
566 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
567 let ret = self.actual.get_table_watermark(vnode);
568 if let Some(expected) = &self.expected {
569 assert_eq!(ret, expected.get_table_watermark(vnode));
570 }
571 ret
572 }
573
574 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
575 VerifyStateStore {
576 actual: self.actual.new_flushed_snapshot_reader(),
577 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
578 _phantom: Default::default(),
579 }
580 }
581 }
582
583 impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
584 for VerifyStateStore<A, E>
585 {
586 async fn flush(&mut self) -> StorageResult<usize> {
587 if let Some(expected) = &mut self.expected {
588 expected.flush().await?;
589 }
590 self.actual.flush().await
591 }
592
593 async fn try_flush(&mut self) -> StorageResult<()> {
594 if let Some(expected) = &mut self.expected {
595 expected.try_flush().await?;
596 }
597 self.actual.try_flush().await
598 }
599
600 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
601 self.actual.init(options.clone()).await?;
602 if let Some(expected) = &mut self.expected {
603 expected.init(options).await?;
604 }
605 Ok(())
606 }
607
608 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
609 if let Some(expected) = &mut self.expected {
610 expected.seal_current_epoch(next_epoch, opts.clone());
611 }
612 self.actual.seal_current_epoch(next_epoch, opts);
613 }
614 }
615
616 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
617 type Local = VerifyStateStore<A::Local, E::Local>;
618 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
619 type VectorWriter = A::VectorWriter;
620
621 fn try_wait_epoch(
622 &self,
623 epoch: HummockReadEpoch,
624 options: TryWaitEpochOptions,
625 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
626 self.actual.try_wait_epoch(epoch, options)
627 }
628
629 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
630 let expected = if let Some(expected) = &self.expected {
631 Some(expected.new_local(option.clone()).await)
632 } else {
633 None
634 };
635 VerifyStateStore {
636 actual: self.actual.new_local(option).await,
637 expected,
638 _phantom: PhantomData::<()>,
639 }
640 }
641
642 async fn new_read_snapshot(
643 &self,
644 epoch: HummockReadEpoch,
645 options: NewReadSnapshotOptions,
646 ) -> StorageResult<Self::ReadSnapshot> {
647 let expected = if let Some(expected) = &self.expected {
648 Some(expected.new_read_snapshot(epoch, options).await?)
649 } else {
650 None
651 };
652 Ok(VerifyStateStore {
653 actual: self.actual.new_read_snapshot(epoch, options).await?,
654 expected,
655 _phantom: PhantomData::<()>,
656 })
657 }
658
659 fn new_vector_writer(
660 &self,
661 options: NewVectorWriterOptions,
662 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
663 self.actual.new_vector_writer(options)
664 }
665 }
666
667 impl<A, E> Deref for VerifyStateStore<A, E> {
668 type Target = A;
669
670 fn deref(&self) -> &Self::Target {
671 &self.actual
672 }
673 }
674}
675
676impl StateStoreImpl {
677 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
678 #[allow(clippy::too_many_arguments)]
679 #[expect(clippy::borrowed_box)]
680 pub async fn new(
681 s: &str,
682 opts: Arc<StorageOpts>,
683 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
684 state_store_metrics: Arc<HummockStateStoreMetrics>,
685 object_store_metrics: Arc<ObjectStoreMetrics>,
686 storage_metrics: Arc<MonitoredStorageMetrics>,
687 compactor_metrics: Arc<CompactorMetrics>,
688 await_tree_config: Option<await_tree::Config>,
689 use_new_object_prefix_strategy: bool,
690 ) -> StorageResult<Self> {
691 const KB: usize = 1 << 10;
692 const MB: usize = 1 << 20;
693
694 let meta_cache = {
695 let mut builder = HybridCacheBuilder::new()
696 .with_name("foyer.meta")
697 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
698 .memory(opts.meta_cache_capacity_mb * MB)
699 .with_shards(opts.meta_cache_shard_num)
700 .with_eviction_config(opts.meta_cache_eviction_config.clone())
701 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
702 u64::BITS as usize / 8 + value.estimate_size()
703 })
704 .storage(Engine::Large(
705 LargeEngineOptions::new()
706 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
707 .with_flushers(opts.meta_file_cache_flushers)
708 .with_reclaimers(opts.meta_file_cache_reclaimers)
709 .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) .with_clean_region_threshold(
711 opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
712 )
713 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
714 .with_blob_index_size(16 * KB)
715 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
716 opts.meta_file_cache_fifo_probation_ratio,
717 ))]),
718 ));
719
720 if !opts.meta_file_cache_dir.is_empty() {
721 if let Err(e) = Feature::ElasticDiskCache.check_available() {
722 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
723 } else {
724 builder = builder
725 .with_device_options(
726 DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
727 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
728 .with_file_size(opts.meta_file_cache_file_capacity_mb * MB)
729 .with_throttle(opts.meta_file_cache_throttle.clone()),
730 )
731 .with_recover_mode(opts.meta_file_cache_recover_mode)
732 .with_compression(opts.meta_file_cache_compression)
733 .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
734 }
735 }
736
737 builder.build().await.map_err(HummockError::foyer_error)?
738 };
739
740 let block_cache = {
741 let mut builder = HybridCacheBuilder::new()
742 .with_name("foyer.data")
743 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
744 .with_event_listener(Arc::new(BlockCacheEventListener::new(
745 state_store_metrics.clone(),
746 )))
747 .memory(opts.block_cache_capacity_mb * MB)
748 .with_shards(opts.block_cache_shard_num)
749 .with_eviction_config(opts.block_cache_eviction_config.clone())
750 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
751 u64::BITS as usize * 2 / 8 + value.raw().len()
753 })
754 .storage(Engine::Large(
755 LargeEngineOptions::new()
756 .with_indexer_shards(opts.data_file_cache_indexer_shards)
757 .with_flushers(opts.data_file_cache_flushers)
758 .with_reclaimers(opts.data_file_cache_reclaimers)
759 .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) .with_clean_region_threshold(
761 opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
762 )
763 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
764 .with_blob_index_size(16 * KB)
765 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
766 opts.data_file_cache_fifo_probation_ratio,
767 ))]),
768 ));
769
770 if !opts.data_file_cache_dir.is_empty() {
771 if let Err(e) = Feature::ElasticDiskCache.check_available() {
772 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
773 } else {
774 builder = builder
775 .with_device_options(
776 DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
777 .with_capacity(opts.data_file_cache_capacity_mb * MB)
778 .with_file_size(opts.data_file_cache_file_capacity_mb * MB)
779 .with_throttle(opts.data_file_cache_throttle.clone()),
780 )
781 .with_recover_mode(opts.data_file_cache_recover_mode)
782 .with_compression(opts.data_file_cache_compression)
783 .with_runtime_options(opts.data_file_cache_runtime_config.clone());
784 }
785 }
786
787 builder.build().await.map_err(HummockError::foyer_error)?
788 };
789
790 let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
791 .with_shards(opts.vector_meta_cache_shard_num)
792 .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
793 .build();
794
795 let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
796 .with_shards(opts.vector_block_cache_shard_num)
797 .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
798 .build();
799
800 let recent_filter = if opts.data_file_cache_dir.is_empty() {
801 None
802 } else {
803 Some(Arc::new(RecentFilter::new(
804 opts.cache_refill_recent_filter_layers,
805 Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
806 )))
807 };
808
809 let store = match s {
810 hummock if hummock.starts_with("hummock+") => {
811 let object_store = build_remote_object_store(
812 hummock.strip_prefix("hummock+").unwrap(),
813 object_store_metrics.clone(),
814 "Hummock",
815 Arc::new(opts.object_store_config.clone()),
816 )
817 .await;
818
819 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
820 store: Arc::new(object_store),
821 path: opts.data_directory.clone(),
822 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
823 max_prefetch_block_number: opts.max_prefetch_block_number,
824 recent_filter,
825 state_store_metrics: state_store_metrics.clone(),
826 use_new_object_prefix_strategy,
827
828 meta_cache,
829 block_cache,
830 vector_meta_cache,
831 vector_block_cache,
832 }));
833 let notification_client =
834 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
835 let compaction_catalog_manager_ref =
836 Arc::new(CompactionCatalogManager::new(Box::new(
837 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
838 )));
839
840 let inner = HummockStorage::new(
841 opts.clone(),
842 sstable_store,
843 hummock_meta_client.clone(),
844 notification_client,
845 compaction_catalog_manager_ref,
846 state_store_metrics.clone(),
847 compactor_metrics.clone(),
848 await_tree_config,
849 )
850 .await?;
851
852 StateStoreImpl::hummock(inner, storage_metrics)
853 }
854
855 "in_memory" | "in-memory" => {
856 tracing::warn!(
857 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
858 );
859 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
860 }
861
862 sled if sled.starts_with("sled://") => {
863 tracing::warn!(
864 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
865 );
866 let path = sled.strip_prefix("sled://").unwrap();
867 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
868 }
869
870 other => unimplemented!("{} state store is not supported", other),
871 };
872
873 Ok(store)
874 }
875}
876
877pub trait AsHummock: Send + Sync {
878 fn as_hummock(&self) -> Option<&HummockStorage>;
879
880 fn sync(
881 &self,
882 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
883 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
884 async move {
885 if let Some(hummock) = self.as_hummock() {
886 hummock.sync(sync_table_epochs).await
887 } else {
888 Ok(SyncResult::default())
889 }
890 }
891 .boxed()
892 }
893}
894
895impl AsHummock for HummockStorage {
896 fn as_hummock(&self) -> Option<&HummockStorage> {
897 Some(self)
898 }
899}
900
901impl AsHummock for MemoryStateStore {
902 fn as_hummock(&self) -> Option<&HummockStorage> {
903 None
904 }
905}
906
907impl AsHummock for SledStateStore {
908 fn as_hummock(&self) -> Option<&HummockStorage> {
909 None
910 }
911}
912
913#[cfg(debug_assertions)]
914mod dyn_state_store {
915 use std::future::Future;
916 use std::ops::DerefMut;
917 use std::sync::Arc;
918
919 use bytes::Bytes;
920 use risingwave_common::bitmap::Bitmap;
921 use risingwave_common::hash::VirtualNode;
922 use risingwave_hummock_sdk::HummockReadEpoch;
923 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
924
925 use crate::error::StorageResult;
926 use crate::hummock::HummockStorage;
927 use crate::store::*;
928 use crate::store_impl::AsHummock;
929 use crate::vector::VectorDistance;
930
931 #[async_trait::async_trait]
932 pub trait DynStateStoreIter<T: IterItem>: Send {
933 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
934 }
935
936 #[async_trait::async_trait]
937 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
938 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
939 self.try_next().await
940 }
941 }
942
943 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
944 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
945 fn try_next(
946 &mut self,
947 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
948 self.deref_mut().try_next()
949 }
950 }
951
952 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
955 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
956
957 #[async_trait::async_trait]
958 pub trait DynStateStoreGet: StaticSendSync {
959 async fn get_keyed_row(
960 &self,
961 key: TableKey<Bytes>,
962 read_options: ReadOptions,
963 ) -> StorageResult<Option<StateStoreKeyedRow>>;
964 }
965
966 #[async_trait::async_trait]
967 pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
968 async fn iter(
969 &self,
970 key_range: TableKeyRange,
971
972 read_options: ReadOptions,
973 ) -> StorageResult<BoxStateStoreReadIter>;
974
975 async fn rev_iter(
976 &self,
977 key_range: TableKeyRange,
978
979 read_options: ReadOptions,
980 ) -> StorageResult<BoxStateStoreReadIter>;
981 }
982
983 #[async_trait::async_trait]
984 pub trait DynStateStoreReadLog: StaticSendSync {
985 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
986 async fn iter_log(
987 &self,
988 epoch_range: (u64, u64),
989 key_range: TableKeyRange,
990 options: ReadLogOptions,
991 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
992 }
993
994 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
995
996 #[async_trait::async_trait]
997 impl<S: StateStoreGet> DynStateStoreGet for S {
998 async fn get_keyed_row(
999 &self,
1000 key: TableKey<Bytes>,
1001 read_options: ReadOptions,
1002 ) -> StorageResult<Option<StateStoreKeyedRow>> {
1003 self.on_key_value(key, read_options, move |key, value| {
1004 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1005 })
1006 .await
1007 }
1008 }
1009
1010 #[async_trait::async_trait]
1011 impl<S: StateStoreRead> DynStateStoreRead for S {
1012 async fn iter(
1013 &self,
1014 key_range: TableKeyRange,
1015
1016 read_options: ReadOptions,
1017 ) -> StorageResult<BoxStateStoreReadIter> {
1018 Ok(Box::new(self.iter(key_range, read_options).await?))
1019 }
1020
1021 async fn rev_iter(
1022 &self,
1023 key_range: TableKeyRange,
1024
1025 read_options: ReadOptions,
1026 ) -> StorageResult<BoxStateStoreReadIter> {
1027 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1028 }
1029 }
1030
1031 #[async_trait::async_trait]
1032 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1033 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1034 self.next_epoch(epoch, options).await
1035 }
1036
1037 async fn iter_log(
1038 &self,
1039 epoch_range: (u64, u64),
1040 key_range: TableKeyRange,
1041 options: ReadLogOptions,
1042 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1043 Ok(Box::new(
1044 self.iter_log(epoch_range, key_range, options).await?,
1045 ))
1046 }
1047 }
1048
1049 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1051 #[async_trait::async_trait]
1052 pub trait DynLocalStateStore:
1053 DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1054 {
1055 async fn iter(
1056 &self,
1057 key_range: TableKeyRange,
1058 read_options: ReadOptions,
1059 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1060
1061 async fn rev_iter(
1062 &self,
1063 key_range: TableKeyRange,
1064 read_options: ReadOptions,
1065 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1066
1067 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1068
1069 fn insert(
1070 &mut self,
1071 key: TableKey<Bytes>,
1072 new_val: Bytes,
1073 old_val: Option<Bytes>,
1074 ) -> StorageResult<()>;
1075
1076 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1077
1078 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1079
1080 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1081 }
1082
1083 #[async_trait::async_trait]
1084 pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1085 async fn flush(&mut self) -> StorageResult<usize>;
1086
1087 async fn try_flush(&mut self) -> StorageResult<()>;
1088
1089 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1090
1091 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1092 }
1093
1094 #[async_trait::async_trait]
1095 impl<S: LocalStateStore> DynLocalStateStore for S {
1096 async fn iter(
1097 &self,
1098 key_range: TableKeyRange,
1099 read_options: ReadOptions,
1100 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1101 Ok(Box::new(self.iter(key_range, read_options).await?))
1102 }
1103
1104 async fn rev_iter(
1105 &self,
1106 key_range: TableKeyRange,
1107 read_options: ReadOptions,
1108 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1109 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1110 }
1111
1112 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1113 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1114 }
1115
1116 fn insert(
1117 &mut self,
1118 key: TableKey<Bytes>,
1119 new_val: Bytes,
1120 old_val: Option<Bytes>,
1121 ) -> StorageResult<()> {
1122 self.insert(key, new_val, old_val)
1123 }
1124
1125 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1126 self.delete(key, old_val)
1127 }
1128
1129 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1130 self.update_vnode_bitmap(vnodes).await
1131 }
1132
1133 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1134 self.get_table_watermark(vnode)
1135 }
1136 }
1137
1138 #[async_trait::async_trait]
1139 impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1140 async fn flush(&mut self) -> StorageResult<usize> {
1141 self.flush().await
1142 }
1143
1144 async fn try_flush(&mut self) -> StorageResult<()> {
1145 self.try_flush().await
1146 }
1147
1148 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1149 self.init(options).await
1150 }
1151
1152 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1153 self.seal_current_epoch(next_epoch, opts)
1154 }
1155 }
1156
1157 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1158
1159 impl LocalStateStore for BoxDynLocalStateStore {
1160 type FlushedSnapshotReader = StateStoreReadDynRef;
1161 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1162 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1163
1164 fn iter(
1165 &self,
1166 key_range: TableKeyRange,
1167 read_options: ReadOptions,
1168 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1169 (*self.0).iter(key_range, read_options)
1170 }
1171
1172 fn rev_iter(
1173 &self,
1174 key_range: TableKeyRange,
1175 read_options: ReadOptions,
1176 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1177 (*self.0).rev_iter(key_range, read_options)
1178 }
1179
1180 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1181 (*self.0).new_flushed_snapshot_reader()
1182 }
1183
1184 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1185 (*self.0).get_table_watermark(vnode)
1186 }
1187
1188 fn insert(
1189 &mut self,
1190 key: TableKey<Bytes>,
1191 new_val: Bytes,
1192 old_val: Option<Bytes>,
1193 ) -> StorageResult<()> {
1194 (*self.0).insert(key, new_val, old_val)
1195 }
1196
1197 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1198 (*self.0).delete(key, old_val)
1199 }
1200
1201 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1202 (*self.0).update_vnode_bitmap(vnodes).await
1203 }
1204 }
1205
1206 impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1207 where
1208 StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1209 {
1210 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1211 self.as_mut().flush()
1212 }
1213
1214 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1215 self.as_mut().try_flush()
1216 }
1217
1218 fn init(
1219 &mut self,
1220 options: InitOptions,
1221 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1222 self.as_mut().init(options)
1223 }
1224
1225 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1226 self.as_mut().seal_current_epoch(next_epoch, opts)
1227 }
1228 }
1229
1230 #[async_trait::async_trait]
1231 pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1232 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1233 }
1234
1235 #[async_trait::async_trait]
1236 impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1237 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1238 self.insert(vec, info)
1239 }
1240 }
1241
1242 pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1243
1244 impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1245 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1246 self.0.insert(vec, info)
1247 }
1248 }
1249
1250 #[async_trait::async_trait]
1253 pub trait DynStateStoreReadVector: StaticSendSync {
1254 async fn nearest(
1255 &self,
1256 vec: Vector,
1257 options: VectorNearestOptions,
1258 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1259 }
1260
1261 #[async_trait::async_trait]
1262 impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1263 async fn nearest(
1264 &self,
1265 vec: Vector,
1266 options: VectorNearestOptions,
1267 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1268 self.nearest(vec, options, |vec, distance, info| {
1269 (
1270 Vector::clone_from_ref(vec),
1271 distance,
1272 Bytes::copy_from_slice(info),
1273 )
1274 })
1275 .await
1276 }
1277 }
1278
1279 impl<P> StateStoreReadVector for StateStorePointer<P>
1280 where
1281 StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1282 {
1283 async fn nearest<O: Send + 'static>(
1284 &self,
1285 vec: Vector,
1286 options: VectorNearestOptions,
1287 on_nearest_item_fn: impl OnNearestItemFn<O>,
1288 ) -> StorageResult<Vec<O>> {
1289 let output = self.as_ref().nearest(vec, options).await?;
1290 Ok(output
1291 .into_iter()
1292 .map(|(vec, distance, info)| {
1293 on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1294 })
1295 .collect())
1296 }
1297 }
1298
1299 pub trait DynStateStoreReadSnapshot:
1300 DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1301 {
1302 }
1303
1304 impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1305 for S
1306 {
1307 }
1308
1309 pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1310 #[async_trait::async_trait]
1311 pub trait DynStateStoreExt: StaticSendSync {
1312 async fn try_wait_epoch(
1313 &self,
1314 epoch: HummockReadEpoch,
1315 options: TryWaitEpochOptions,
1316 ) -> StorageResult<()>;
1317
1318 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1319 async fn new_read_snapshot(
1320 &self,
1321 epoch: HummockReadEpoch,
1322 options: NewReadSnapshotOptions,
1323 ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1324 async fn new_vector_writer(
1325 &self,
1326 options: NewVectorWriterOptions,
1327 ) -> BoxDynStateStoreWriteVector;
1328 }
1329
1330 #[async_trait::async_trait]
1331 impl<S: StateStore> DynStateStoreExt for S {
1332 async fn try_wait_epoch(
1333 &self,
1334 epoch: HummockReadEpoch,
1335 options: TryWaitEpochOptions,
1336 ) -> StorageResult<()> {
1337 self.try_wait_epoch(epoch, options).await
1338 }
1339
1340 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1341 StateStorePointer(Box::new(self.new_local(option).await))
1342 }
1343
1344 async fn new_read_snapshot(
1345 &self,
1346 epoch: HummockReadEpoch,
1347 options: NewReadSnapshotOptions,
1348 ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1349 Ok(StateStorePointer(Arc::new(
1350 self.new_read_snapshot(epoch, options).await?,
1351 )))
1352 }
1353
1354 async fn new_vector_writer(
1355 &self,
1356 options: NewVectorWriterOptions,
1357 ) -> BoxDynStateStoreWriteVector {
1358 StateStorePointer(Box::new(self.new_vector_writer(options).await))
1359 }
1360 }
1361
1362 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1363
1364 macro_rules! state_store_pointer_dyn_as_ref {
1365 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1366 impl AsRef<dyn $target_dyn_trait>
1367 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1368 {
1369 fn as_ref(&self) -> &dyn $target_dyn_trait {
1370 (&*self.0) as _
1371 }
1372 }
1373 };
1374 }
1375
1376 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1377 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1378 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1379 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1380 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1381 state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1382
1383 macro_rules! state_store_pointer_dyn_as_mut {
1384 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1385 impl AsMut<dyn $target_dyn_trait>
1386 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1387 {
1388 fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1389 (&mut *self.0) as _
1390 }
1391 }
1392 };
1393 }
1394
1395 state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1396 state_store_pointer_dyn_as_mut!(
1397 Box<dyn DynStateStoreWriteVector>,
1398 DynStateStoreWriteEpochControl
1399 );
1400
1401 #[derive(Clone)]
1402 pub struct StateStorePointer<P>(pub(crate) P);
1403
1404 impl<P> StateStoreGet for StateStorePointer<P>
1405 where
1406 StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1407 {
1408 async fn on_key_value<O: Send + 'static>(
1409 &self,
1410 key: TableKey<Bytes>,
1411 read_options: ReadOptions,
1412 on_key_value_fn: impl KeyValueFn<O>,
1413 ) -> StorageResult<Option<O>> {
1414 let option = self.as_ref().get_keyed_row(key, read_options).await?;
1415 option
1416 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1417 .transpose()
1418 }
1419 }
1420
1421 impl<P> StateStoreRead for StateStorePointer<P>
1422 where
1423 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1424 {
1425 type Iter = BoxStateStoreReadIter;
1426 type RevIter = BoxStateStoreReadIter;
1427
1428 fn iter(
1429 &self,
1430 key_range: TableKeyRange,
1431
1432 read_options: ReadOptions,
1433 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1434 self.as_ref().iter(key_range, read_options)
1435 }
1436
1437 fn rev_iter(
1438 &self,
1439 key_range: TableKeyRange,
1440
1441 read_options: ReadOptions,
1442 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1443 self.as_ref().rev_iter(key_range, read_options)
1444 }
1445 }
1446
1447 impl StateStoreReadLog for StateStoreDynRef {
1448 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1449
1450 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1451 (*self.0).next_epoch(epoch, options).await
1452 }
1453
1454 fn iter_log(
1455 &self,
1456 epoch_range: (u64, u64),
1457 key_range: TableKeyRange,
1458 options: ReadLogOptions,
1459 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1460 (*self.0).iter_log(epoch_range, key_range, options)
1461 }
1462 }
1463
1464 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1465
1466 impl AsHummock for StateStoreDynRef {
1467 fn as_hummock(&self) -> Option<&HummockStorage> {
1468 (*self.0).as_hummock()
1469 }
1470 }
1471
1472 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1473
1474 impl StateStore for StateStoreDynRef {
1475 type Local = BoxDynLocalStateStore;
1476 type ReadSnapshot = StateStoreReadSnapshotDynRef;
1477 type VectorWriter = BoxDynStateStoreWriteVector;
1478
1479 fn try_wait_epoch(
1480 &self,
1481 epoch: HummockReadEpoch,
1482 options: TryWaitEpochOptions,
1483 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1484 (*self.0).try_wait_epoch(epoch, options)
1485 }
1486
1487 fn new_local(
1488 &self,
1489 option: NewLocalOptions,
1490 ) -> impl Future<Output = Self::Local> + Send + '_ {
1491 (*self.0).new_local(option)
1492 }
1493
1494 async fn new_read_snapshot(
1495 &self,
1496 epoch: HummockReadEpoch,
1497 options: NewReadSnapshotOptions,
1498 ) -> StorageResult<Self::ReadSnapshot> {
1499 (*self.0).new_read_snapshot(epoch, options).await
1500 }
1501
1502 fn new_vector_writer(
1503 &self,
1504 options: NewVectorWriterOptions,
1505 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1506 (*self.0).new_vector_writer(options)
1507 }
1508 }
1509}