1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22 BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23 HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45 Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46 SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52 ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57 Box::new(PrometheusMetricsRegistry::new(
58 GLOBAL_METRICS_REGISTRY.clone(),
59 ))
60});
61
62mod opaque_type {
63 use super::*;
64
65 pub type HummockStorageType = impl StateStore + AsHummock;
66 pub type MemoryStateStoreType = impl StateStore + AsHummock;
67 pub type SledStateStoreType = impl StateStore + AsHummock;
68
69 #[define_opaque(MemoryStateStoreType)]
70 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71 may_dynamic_dispatch(state_store)
72 }
73
74 #[define_opaque(HummockStorageType)]
75 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76 may_dynamic_dispatch(may_verify(state_store))
77 }
78
79 #[define_opaque(SledStateStoreType)]
80 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81 may_dynamic_dispatch(state_store)
82 }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94 state_store: S,
95 storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97 let inner = {
98 #[cfg(feature = "hm-trace")]
99 {
100 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101 }
102 #[cfg(not(feature = "hm-trace"))]
103 {
104 state_store
105 }
106 };
107 inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111 let inner = state_store.inner();
112 {
113 #[cfg(feature = "hm-trace")]
114 {
115 inner.inner()
116 }
117 #[cfg(not(feature = "hm-trace"))]
118 {
119 inner
120 }
121 }
122}
123
124#[derive(Clone, EnumAsInner)]
126#[expect(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128 HummockStateStore(Monitored<HummockStorageType>),
137 MemoryStateStore(Monitored<MemoryStateStoreType>),
142 SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146 #[cfg(not(debug_assertions))]
147 {
148 state_store
149 }
150 #[cfg(debug_assertions)]
151 {
152 use crate::store_impl::dyn_state_store::StateStorePointer;
153 StateStorePointer(Arc::new(state_store) as _)
154 }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158 #[cfg(not(debug_assertions))]
159 {
160 state_store
161 }
162 #[cfg(debug_assertions)]
163 {
164 use std::marker::PhantomData;
165
166 use risingwave_common::util::env_var::env_var_is_true;
167 use tracing::info;
168
169 use crate::store_impl::verify::VerifyStateStore;
170
171 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172 info!("enable verify state store");
173 Some(SledStateStore::new_temp())
174 } else {
175 info!("verify state store is not enabled");
176 None
177 };
178 VerifyStateStore {
179 actual: state_store,
180 expected,
181 _phantom: PhantomData::<()>,
182 }
183 }
184}
185
186impl StateStoreImpl {
187 fn in_memory(
188 state_store: MemoryStateStore,
189 storage_metrics: Arc<MonitoredStorageMetrics>,
190 ) -> Self {
191 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193 }
194
195 pub fn hummock(
196 state_store: HummockStorage,
197 storage_metrics: Arc<MonitoredStorageMetrics>,
198 ) -> Self {
199 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201 }
202
203 pub fn sled(
204 state_store: SledStateStore,
205 storage_metrics: Arc<MonitoredStorageMetrics>,
206 ) -> Self {
207 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208 }
209
210 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212 }
213
214 pub fn for_test() -> Self {
215 Self::in_memory(
216 MemoryStateStore::new(),
217 Arc::new(MonitoredStorageMetrics::unused()),
218 )
219 }
220
221 pub fn as_hummock(&self) -> Option<&HummockStorage> {
222 match self {
223 StateStoreImpl::HummockStateStore(hummock) => {
224 Some(inner(hummock).as_hummock().expect("should be hummock"))
225 }
226 _ => None,
227 }
228 }
229}
230
231impl Debug for StateStoreImpl {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 match self {
234 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237 }
238 }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243 ($impl:expr, $store:ident, $body:tt) => {{
244 use $crate::store_impl::StateStoreImpl;
245
246 match $impl {
247 StateStoreImpl::MemoryStateStore($store) => {
248 #[cfg(debug_assertions)]
251 {
252 $body
253 }
254 #[cfg(not(debug_assertions))]
255 {
256 let _store = $store;
257 unimplemented!("memory state store should never be used in release mode");
258 }
259 }
260
261 StateStoreImpl::SledStateStore($store) => {
262 #[cfg(debug_assertions)]
265 {
266 $body
267 }
268 #[cfg(not(debug_assertions))]
269 {
270 let _store = $store;
271 unimplemented!("sled state store should never be used in release mode");
272 }
273 }
274
275 StateStoreImpl::HummockStateStore($store) => $body,
276 }
277 }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282 use std::fmt::Debug;
283 use std::future::Future;
284 use std::marker::PhantomData;
285 use std::ops::Deref;
286 use std::sync::Arc;
287
288 use bytes::Bytes;
289 use risingwave_common::array::VectorRef;
290 use risingwave_common::bitmap::Bitmap;
291 use risingwave_common::hash::VirtualNode;
292 use risingwave_hummock_sdk::HummockReadEpoch;
293 use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
294 use tracing::log::warn;
295
296 use crate::error::StorageResult;
297 use crate::hummock::HummockStorage;
298 use crate::store::*;
299 use crate::store_impl::AsHummock;
300
301 #[expect(dead_code)]
302 fn assert_result_eq<Item: PartialEq + Debug, E>(
303 first: &std::result::Result<Item, E>,
304 second: &std::result::Result<Item, E>,
305 ) {
306 match (first, second) {
307 (Ok(first), Ok(second)) => {
308 if first != second {
309 warn!("result different: {:?} {:?}", first, second);
310 }
311 assert_eq!(first, second);
312 }
313 (Err(_), Err(_)) => {}
314 _ => {
315 warn!("one success and one failed");
316 panic!("result not equal");
317 }
318 }
319 }
320
321 #[derive(Clone)]
322 pub struct VerifyStateStore<A, E, T = ()> {
323 pub actual: A,
324 pub expected: Option<E>,
325 pub _phantom: PhantomData<T>,
326 }
327
328 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
329 fn as_hummock(&self) -> Option<&HummockStorage> {
330 self.actual.as_hummock()
331 }
332 }
333
334 impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
335 async fn on_key_value<'a, O: Send + 'a>(
336 &'a self,
337 key: TableKey<Bytes>,
338 read_options: ReadOptions,
339 on_key_value_fn: impl KeyValueFn<'a, O>,
340 ) -> StorageResult<Option<O>> {
341 let actual: Option<(FullKey<Bytes>, Bytes)> = self
342 .actual
343 .on_key_value(key.clone(), read_options.clone(), |key, value| {
344 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345 })
346 .await?;
347 if let Some(expected) = &self.expected {
348 let expected: Option<(FullKey<Bytes>, Bytes)> = expected
349 .on_key_value(key, read_options, |key, value| {
350 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
351 })
352 .await?;
353 assert_eq!(
354 actual
355 .as_ref()
356 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
357 expected
358 .as_ref()
359 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
360 );
361 }
362
363 actual
364 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
365 .transpose()
366 }
367 }
368
369 impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
370 for VerifyStateStore<A, E>
371 {
372 fn nearest<'a, O: Send + 'a>(
373 &'a self,
374 vec: VectorRef<'a>,
375 options: VectorNearestOptions,
376 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
377 ) -> impl StorageFuture<'a, Vec<O>> {
378 self.actual.nearest(vec, options, on_nearest_item_fn)
379 }
380 }
381
382 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
383 type Iter = impl StateStoreReadIter;
384 type RevIter = impl StateStoreReadIter;
385
386 #[expect(clippy::manual_async_fn)]
389 fn iter(
390 &self,
391 key_range: TableKeyRange,
392
393 read_options: ReadOptions,
394 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
395 async move {
396 let actual = self
397 .actual
398 .iter(key_range.clone(), read_options.clone())
399 .await?;
400 let expected = if let Some(expected) = &self.expected {
401 Some(expected.iter(key_range, read_options).await?)
402 } else {
403 None
404 };
405
406 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
407 }
408 }
409
410 #[expect(clippy::manual_async_fn)]
411 fn rev_iter(
412 &self,
413 key_range: TableKeyRange,
414
415 read_options: ReadOptions,
416 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
417 async move {
418 let actual = self
419 .actual
420 .rev_iter(key_range.clone(), read_options.clone())
421 .await?;
422 let expected = if let Some(expected) = &self.expected {
423 Some(expected.rev_iter(key_range, read_options).await?)
424 } else {
425 None
426 };
427
428 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
429 }
430 }
431 }
432
433 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
434 type ChangeLogIter = impl StateStoreReadChangeLogIter;
435
436 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
437 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
438 if let Some(expected) = &self.expected {
439 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
440 }
441 Ok(actual)
442 }
443
444 async fn iter_log(
445 &self,
446 epoch_range: (u64, u64),
447 key_range: TableKeyRange,
448 options: ReadLogOptions,
449 ) -> StorageResult<Self::ChangeLogIter> {
450 let actual = self
451 .actual
452 .iter_log(epoch_range, key_range.clone(), options.clone())
453 .await?;
454 let expected = if let Some(expected) = &self.expected {
455 Some(expected.iter_log(epoch_range, key_range, options).await?)
456 } else {
457 None
458 };
459
460 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
461 }
462 }
463
464 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
465 for VerifyStateStore<A, E, T>
466 where
467 for<'a> T::ItemRef<'a>: PartialEq + Debug,
468 {
469 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
470 let actual = self.actual.try_next().await?;
471 if let Some(expected) = self.expected.as_mut() {
472 let expected = expected.try_next().await?;
473 assert_eq!(actual, expected);
474 }
475 Ok(actual)
476 }
477 }
478
479 fn verify_iter<T: IterItem>(
480 actual: impl StateStoreIter<T>,
481 expected: Option<impl StateStoreIter<T>>,
482 ) -> impl StateStoreIter<T>
483 where
484 for<'a> T::ItemRef<'a>: PartialEq + Debug,
485 {
486 VerifyStateStore {
487 actual,
488 expected,
489 _phantom: PhantomData::<T>,
490 }
491 }
492
493 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
494 type FlushedSnapshotReader =
495 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
496
497 type Iter<'a> = impl StateStoreIter + 'a;
498 type RevIter<'a> = impl StateStoreIter + 'a;
499
500 #[expect(clippy::manual_async_fn)]
501 fn iter(
502 &self,
503 key_range: TableKeyRange,
504 read_options: ReadOptions,
505 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
506 async move {
507 let actual = self
508 .actual
509 .iter(key_range.clone(), read_options.clone())
510 .await?;
511 let expected = if let Some(expected) = &self.expected {
512 Some(expected.iter(key_range, read_options).await?)
513 } else {
514 None
515 };
516
517 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
518 }
519 }
520
521 #[expect(clippy::manual_async_fn)]
522 fn rev_iter(
523 &self,
524 key_range: TableKeyRange,
525 read_options: ReadOptions,
526 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
527 async move {
528 let actual = self
529 .actual
530 .rev_iter(key_range.clone(), read_options.clone())
531 .await?;
532 let expected = if let Some(expected) = &self.expected {
533 Some(expected.rev_iter(key_range, read_options).await?)
534 } else {
535 None
536 };
537
538 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
539 }
540 }
541
542 fn insert(
543 &mut self,
544 key: TableKey<Bytes>,
545 new_val: Bytes,
546 old_val: Option<Bytes>,
547 ) -> StorageResult<()> {
548 if let Some(expected) = &mut self.expected {
549 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
550 }
551 self.actual.insert(key, new_val, old_val)?;
552
553 Ok(())
554 }
555
556 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
557 if let Some(expected) = &mut self.expected {
558 expected.delete(key.clone(), old_val.clone())?;
559 }
560 self.actual.delete(key, old_val)?;
561 Ok(())
562 }
563
564 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
565 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
566 if let Some(expected) = &mut self.expected {
567 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
568 }
569 Ok(ret)
570 }
571
572 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
573 let ret = self.actual.get_table_watermark(vnode);
574 if let Some(expected) = &self.expected {
575 assert_eq!(ret, expected.get_table_watermark(vnode));
576 }
577 ret
578 }
579
580 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
581 VerifyStateStore {
582 actual: self.actual.new_flushed_snapshot_reader(),
583 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
584 _phantom: Default::default(),
585 }
586 }
587 }
588
589 impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
590 for VerifyStateStore<A, E>
591 {
592 async fn flush(&mut self) -> StorageResult<usize> {
593 if let Some(expected) = &mut self.expected {
594 expected.flush().await?;
595 }
596 self.actual.flush().await
597 }
598
599 async fn try_flush(&mut self) -> StorageResult<()> {
600 if let Some(expected) = &mut self.expected {
601 expected.try_flush().await?;
602 }
603 self.actual.try_flush().await
604 }
605
606 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
607 self.actual.init(options.clone()).await?;
608 if let Some(expected) = &mut self.expected {
609 expected.init(options).await?;
610 }
611 Ok(())
612 }
613
614 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
615 if let Some(expected) = &mut self.expected {
616 expected.seal_current_epoch(next_epoch, opts.clone());
617 }
618 self.actual.seal_current_epoch(next_epoch, opts);
619 }
620 }
621
622 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
623 type Local = VerifyStateStore<A::Local, E::Local>;
624 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
625 type VectorWriter = A::VectorWriter;
626
627 fn try_wait_epoch(
628 &self,
629 epoch: HummockReadEpoch,
630 options: TryWaitEpochOptions,
631 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
632 self.actual.try_wait_epoch(epoch, options)
633 }
634
635 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
636 let expected = if let Some(expected) = &self.expected {
637 Some(expected.new_local(option.clone()).await)
638 } else {
639 None
640 };
641 VerifyStateStore {
642 actual: self.actual.new_local(option).await,
643 expected,
644 _phantom: PhantomData::<()>,
645 }
646 }
647
648 async fn new_read_snapshot(
649 &self,
650 epoch: HummockReadEpoch,
651 options: NewReadSnapshotOptions,
652 ) -> StorageResult<Self::ReadSnapshot> {
653 let expected = if let Some(expected) = &self.expected {
654 Some(expected.new_read_snapshot(epoch, options).await?)
655 } else {
656 None
657 };
658 Ok(VerifyStateStore {
659 actual: self.actual.new_read_snapshot(epoch, options).await?,
660 expected,
661 _phantom: PhantomData::<()>,
662 })
663 }
664
665 fn new_vector_writer(
666 &self,
667 options: NewVectorWriterOptions,
668 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
669 self.actual.new_vector_writer(options)
670 }
671 }
672
673 impl<A, E> Deref for VerifyStateStore<A, E> {
674 type Target = A;
675
676 fn deref(&self) -> &Self::Target {
677 &self.actual
678 }
679 }
680}
681
682impl StateStoreImpl {
683 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
684 #[expect(clippy::borrowed_box)]
685 pub async fn new(
686 s: &str,
687 opts: Arc<StorageOpts>,
688 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
689 state_store_metrics: Arc<HummockStateStoreMetrics>,
690 object_store_metrics: Arc<ObjectStoreMetrics>,
691 storage_metrics: Arc<MonitoredStorageMetrics>,
692 compactor_metrics: Arc<CompactorMetrics>,
693 await_tree_config: Option<await_tree::Config>,
694 use_new_object_prefix_strategy: bool,
695 ) -> StorageResult<Self> {
696 const KB: usize = 1 << 10;
697 const MB: usize = 1 << 20;
698
699 let meta_cache = {
700 let mut builder = HybridCacheBuilder::new()
701 .with_name("foyer.meta")
702 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
703 .memory(opts.meta_cache_capacity_mb * MB)
704 .with_shards(opts.meta_cache_shard_num)
705 .with_eviction_config(opts.meta_cache_eviction_config.clone())
706 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
707 u64::BITS as usize / 8 + value.estimate_size()
708 })
709 .storage();
710
711 if !opts.meta_file_cache_dir.is_empty() {
712 if let Err(e) = Feature::ElasticDiskCache.check_available() {
713 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
714 } else {
715 let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
716 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
717 .with_throttle(opts.meta_file_cache_throttle.clone())
718 .build()
719 .map_err(HummockError::foyer_error)?;
720 let engine_builder = BlockEngineBuilder::new(device)
721 .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
722 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
723 .with_flushers(opts.meta_file_cache_flushers)
724 .with_reclaimers(opts.meta_file_cache_reclaimers)
725 .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
727 opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
728 )
729 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
730 .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
731 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
732 opts.meta_file_cache_fifo_probation_ratio,
733 ))]);
734 builder = builder
735 .with_engine_config(engine_builder)
736 .with_recover_mode(opts.meta_file_cache_recover_mode)
737 .with_compression(opts.meta_file_cache_compression)
738 .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
739 }
740 }
741
742 builder.build().await.map_err(HummockError::foyer_error)?
743 };
744
745 let block_cache = {
746 let mut builder = HybridCacheBuilder::new()
747 .with_name("foyer.data")
748 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
749 .with_event_listener(Arc::new(BlockCacheEventListener::new(
750 state_store_metrics.clone(),
751 )))
752 .memory(opts.block_cache_capacity_mb * MB)
753 .with_shards(opts.block_cache_shard_num)
754 .with_eviction_config(opts.block_cache_eviction_config.clone())
755 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
756 u64::BITS as usize * 2 / 8 + value.raw().len()
758 })
759 .storage();
760
761 if !opts.data_file_cache_dir.is_empty() {
762 if let Err(e) = Feature::ElasticDiskCache.check_available() {
763 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
764 } else {
765 let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
766 .with_capacity(opts.data_file_cache_capacity_mb * MB)
767 .with_throttle(opts.data_file_cache_throttle.clone())
768 .build()
769 .map_err(HummockError::foyer_error)?;
770 let engine_builder = BlockEngineBuilder::new(device)
771 .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
772 .with_indexer_shards(opts.data_file_cache_indexer_shards)
773 .with_flushers(opts.data_file_cache_flushers)
774 .with_reclaimers(opts.data_file_cache_reclaimers)
775 .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
777 opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
778 )
779 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
780 .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
781 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
782 opts.data_file_cache_fifo_probation_ratio,
783 ))]);
784 builder = builder
785 .with_engine_config(engine_builder)
786 .with_recover_mode(opts.data_file_cache_recover_mode)
787 .with_compression(opts.data_file_cache_compression)
788 .with_runtime_options(opts.data_file_cache_runtime_config.clone());
789 }
790 }
791
792 builder.build().await.map_err(HummockError::foyer_error)?
793 };
794
795 let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
796 .with_shards(opts.vector_meta_cache_shard_num)
797 .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
798 .build();
799
800 let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
801 .with_shards(opts.vector_block_cache_shard_num)
802 .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
803 .build();
804
805 let recent_filter = if opts.data_file_cache_dir.is_empty() {
806 Arc::new(NoneRecentFilter::default().into())
807 } else if opts.cache_refill_recent_filter_shards == 1 {
808 Arc::new(
809 SimpleRecentFilter::new(
810 opts.cache_refill_recent_filter_layers,
811 Duration::from_millis(
812 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
813 ),
814 )
815 .into(),
816 )
817 } else if opts.cache_refill_skip_recent_filter {
818 Arc::new(AllRecentFilter::default().into())
819 } else {
820 Arc::new(
821 ShardedRecentFilter::new(
822 opts.cache_refill_recent_filter_layers,
823 Duration::from_millis(
824 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
825 ),
826 opts.cache_refill_recent_filter_shards,
827 )
828 .into(),
829 )
830 };
831
832 let store = match s {
833 hummock if hummock.starts_with("hummock+") => {
834 let object_store = build_remote_object_store(
835 hummock.strip_prefix("hummock+").unwrap(),
836 object_store_metrics.clone(),
837 "Hummock",
838 Arc::new(opts.object_store_config.clone()),
839 )
840 .await;
841
842 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
843 store: Arc::new(object_store),
844 path: opts.data_directory.clone(),
845 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
846 max_prefetch_block_number: opts.max_prefetch_block_number,
847 recent_filter,
848 state_store_metrics: state_store_metrics.clone(),
849 use_new_object_prefix_strategy,
850 skip_bloom_filter_in_serde: opts.sst_skip_bloom_filter_in_serde,
851
852 meta_cache,
853 block_cache,
854 vector_meta_cache,
855 vector_block_cache,
856 }));
857 let notification_client =
858 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
859 let compaction_catalog_manager_ref =
860 Arc::new(CompactionCatalogManager::new(Box::new(
861 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
862 )));
863
864 let inner = HummockStorage::new(
865 opts.clone(),
866 sstable_store,
867 hummock_meta_client.clone(),
868 notification_client,
869 compaction_catalog_manager_ref,
870 state_store_metrics.clone(),
871 compactor_metrics.clone(),
872 await_tree_config,
873 )
874 .await?;
875
876 StateStoreImpl::hummock(inner, storage_metrics)
877 }
878
879 "in_memory" | "in-memory" => {
880 tracing::warn!(
881 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
882 );
883 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
884 }
885
886 sled if sled.starts_with("sled://") => {
887 tracing::warn!(
888 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
889 );
890 let path = sled.strip_prefix("sled://").unwrap();
891 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
892 }
893
894 other => unimplemented!("{} state store is not supported", other),
895 };
896
897 Ok(store)
898 }
899}
900
901pub trait AsHummock: Send + Sync {
902 fn as_hummock(&self) -> Option<&HummockStorage>;
903
904 fn sync(
905 &self,
906 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
907 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
908 async move {
909 if let Some(hummock) = self.as_hummock() {
910 hummock.sync(sync_table_epochs).await
911 } else {
912 Ok(SyncResult::default())
913 }
914 }
915 .boxed()
916 }
917}
918
919impl AsHummock for HummockStorage {
920 fn as_hummock(&self) -> Option<&HummockStorage> {
921 Some(self)
922 }
923}
924
925impl AsHummock for MemoryStateStore {
926 fn as_hummock(&self) -> Option<&HummockStorage> {
927 None
928 }
929}
930
931impl AsHummock for SledStateStore {
932 fn as_hummock(&self) -> Option<&HummockStorage> {
933 None
934 }
935}
936
937#[cfg(debug_assertions)]
938mod dyn_state_store {
939 use std::future::Future;
940 use std::ops::DerefMut;
941 use std::sync::Arc;
942
943 use bytes::Bytes;
944 use risingwave_common::array::VectorRef;
945 use risingwave_common::bitmap::Bitmap;
946 use risingwave_common::hash::VirtualNode;
947 use risingwave_hummock_sdk::HummockReadEpoch;
948 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
949
950 use crate::error::StorageResult;
951 use crate::hummock::HummockStorage;
952 use crate::store::*;
953 use crate::store_impl::AsHummock;
954 use crate::vector::VectorDistance;
955
956 #[async_trait::async_trait]
957 pub trait DynStateStoreIter<T: IterItem>: Send {
958 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
959 }
960
961 #[async_trait::async_trait]
962 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
963 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
964 self.try_next().await
965 }
966 }
967
968 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
969 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
970 fn try_next(
971 &mut self,
972 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
973 self.deref_mut().try_next()
974 }
975 }
976
977 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
980 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
981
982 #[async_trait::async_trait]
983 pub trait DynStateStoreGet: StaticSendSync {
984 async fn get_keyed_row(
985 &self,
986 key: TableKey<Bytes>,
987 read_options: ReadOptions,
988 ) -> StorageResult<Option<StateStoreKeyedRow>>;
989 }
990
991 #[async_trait::async_trait]
992 pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
993 async fn iter(
994 &self,
995 key_range: TableKeyRange,
996
997 read_options: ReadOptions,
998 ) -> StorageResult<BoxStateStoreReadIter>;
999
1000 async fn rev_iter(
1001 &self,
1002 key_range: TableKeyRange,
1003
1004 read_options: ReadOptions,
1005 ) -> StorageResult<BoxStateStoreReadIter>;
1006 }
1007
1008 #[async_trait::async_trait]
1009 pub trait DynStateStoreReadLog: StaticSendSync {
1010 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1011 async fn iter_log(
1012 &self,
1013 epoch_range: (u64, u64),
1014 key_range: TableKeyRange,
1015 options: ReadLogOptions,
1016 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1017 }
1018
1019 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1020
1021 #[async_trait::async_trait]
1022 impl<S: StateStoreGet> DynStateStoreGet for S {
1023 async fn get_keyed_row(
1024 &self,
1025 key: TableKey<Bytes>,
1026 read_options: ReadOptions,
1027 ) -> StorageResult<Option<StateStoreKeyedRow>> {
1028 self.on_key_value(key, read_options, move |key, value| {
1029 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1030 })
1031 .await
1032 }
1033 }
1034
1035 #[async_trait::async_trait]
1036 impl<S: StateStoreRead> DynStateStoreRead for S {
1037 async fn iter(
1038 &self,
1039 key_range: TableKeyRange,
1040
1041 read_options: ReadOptions,
1042 ) -> StorageResult<BoxStateStoreReadIter> {
1043 Ok(Box::new(self.iter(key_range, read_options).await?))
1044 }
1045
1046 async fn rev_iter(
1047 &self,
1048 key_range: TableKeyRange,
1049
1050 read_options: ReadOptions,
1051 ) -> StorageResult<BoxStateStoreReadIter> {
1052 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1053 }
1054 }
1055
1056 #[async_trait::async_trait]
1057 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1058 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1059 self.next_epoch(epoch, options).await
1060 }
1061
1062 async fn iter_log(
1063 &self,
1064 epoch_range: (u64, u64),
1065 key_range: TableKeyRange,
1066 options: ReadLogOptions,
1067 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1068 Ok(Box::new(
1069 self.iter_log(epoch_range, key_range, options).await?,
1070 ))
1071 }
1072 }
1073
1074 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1076 #[async_trait::async_trait]
1077 pub trait DynLocalStateStore:
1078 DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1079 {
1080 async fn iter(
1081 &self,
1082 key_range: TableKeyRange,
1083 read_options: ReadOptions,
1084 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1085
1086 async fn rev_iter(
1087 &self,
1088 key_range: TableKeyRange,
1089 read_options: ReadOptions,
1090 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1091
1092 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1093
1094 fn insert(
1095 &mut self,
1096 key: TableKey<Bytes>,
1097 new_val: Bytes,
1098 old_val: Option<Bytes>,
1099 ) -> StorageResult<()>;
1100
1101 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1102
1103 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1104
1105 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1106 }
1107
1108 #[async_trait::async_trait]
1109 pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1110 async fn flush(&mut self) -> StorageResult<usize>;
1111
1112 async fn try_flush(&mut self) -> StorageResult<()>;
1113
1114 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1115
1116 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1117 }
1118
1119 #[async_trait::async_trait]
1120 impl<S: LocalStateStore> DynLocalStateStore for S {
1121 async fn iter(
1122 &self,
1123 key_range: TableKeyRange,
1124 read_options: ReadOptions,
1125 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1126 Ok(Box::new(self.iter(key_range, read_options).await?))
1127 }
1128
1129 async fn rev_iter(
1130 &self,
1131 key_range: TableKeyRange,
1132 read_options: ReadOptions,
1133 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1134 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1135 }
1136
1137 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1138 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1139 }
1140
1141 fn insert(
1142 &mut self,
1143 key: TableKey<Bytes>,
1144 new_val: Bytes,
1145 old_val: Option<Bytes>,
1146 ) -> StorageResult<()> {
1147 self.insert(key, new_val, old_val)
1148 }
1149
1150 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1151 self.delete(key, old_val)
1152 }
1153
1154 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1155 self.update_vnode_bitmap(vnodes).await
1156 }
1157
1158 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1159 self.get_table_watermark(vnode)
1160 }
1161 }
1162
1163 #[async_trait::async_trait]
1164 impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1165 async fn flush(&mut self) -> StorageResult<usize> {
1166 self.flush().await
1167 }
1168
1169 async fn try_flush(&mut self) -> StorageResult<()> {
1170 self.try_flush().await
1171 }
1172
1173 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1174 self.init(options).await
1175 }
1176
1177 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1178 self.seal_current_epoch(next_epoch, opts)
1179 }
1180 }
1181
1182 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1183
1184 impl LocalStateStore for BoxDynLocalStateStore {
1185 type FlushedSnapshotReader = StateStoreReadDynRef;
1186 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1187 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1188
1189 fn iter(
1190 &self,
1191 key_range: TableKeyRange,
1192 read_options: ReadOptions,
1193 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1194 (*self.0).iter(key_range, read_options)
1195 }
1196
1197 fn rev_iter(
1198 &self,
1199 key_range: TableKeyRange,
1200 read_options: ReadOptions,
1201 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1202 (*self.0).rev_iter(key_range, read_options)
1203 }
1204
1205 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1206 (*self.0).new_flushed_snapshot_reader()
1207 }
1208
1209 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1210 (*self.0).get_table_watermark(vnode)
1211 }
1212
1213 fn insert(
1214 &mut self,
1215 key: TableKey<Bytes>,
1216 new_val: Bytes,
1217 old_val: Option<Bytes>,
1218 ) -> StorageResult<()> {
1219 (*self.0).insert(key, new_val, old_val)
1220 }
1221
1222 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1223 (*self.0).delete(key, old_val)
1224 }
1225
1226 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1227 (*self.0).update_vnode_bitmap(vnodes).await
1228 }
1229 }
1230
1231 impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1232 where
1233 StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1234 {
1235 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1236 self.as_mut().flush()
1237 }
1238
1239 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1240 self.as_mut().try_flush()
1241 }
1242
1243 fn init(
1244 &mut self,
1245 options: InitOptions,
1246 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1247 self.as_mut().init(options)
1248 }
1249
1250 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1251 self.as_mut().seal_current_epoch(next_epoch, opts)
1252 }
1253 }
1254
1255 #[async_trait::async_trait]
1256 pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1257 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
1258 }
1259
1260 #[async_trait::async_trait]
1261 impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1262 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1263 self.insert(vec, info)
1264 }
1265 }
1266
1267 pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1268
1269 impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1270 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1271 self.0.insert(vec, info)
1272 }
1273 }
1274
1275 #[async_trait::async_trait]
1278 pub trait DynStateStoreReadVector: StaticSendSync {
1279 async fn nearest(
1280 &self,
1281 vec: VectorRef<'_>,
1282 options: VectorNearestOptions,
1283 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1284 }
1285
1286 #[async_trait::async_trait]
1287 impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1288 async fn nearest(
1289 &self,
1290 vec: VectorRef<'_>,
1291 options: VectorNearestOptions,
1292 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1293 use risingwave_common::types::ScalarRef;
1294 self.nearest(vec, options, |vec, distance, info| {
1295 (
1296 vec.to_owned_scalar(),
1297 distance,
1298 Bytes::copy_from_slice(info),
1299 )
1300 })
1301 .await
1302 }
1303 }
1304
1305 impl<P> StateStoreReadVector for StateStorePointer<P>
1306 where
1307 StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1308 {
1309 async fn nearest<'a, O: Send + 'a>(
1310 &'a self,
1311 vec: VectorRef<'a>,
1312 options: VectorNearestOptions,
1313 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1314 ) -> StorageResult<Vec<O>> {
1315 let output = self.as_ref().nearest(vec, options).await?;
1316 Ok(output
1317 .into_iter()
1318 .map(|(vec, distance, info)| {
1319 on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1320 })
1321 .collect())
1322 }
1323 }
1324
1325 pub trait DynStateStoreReadSnapshot:
1326 DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1327 {
1328 }
1329
1330 impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1331 for S
1332 {
1333 }
1334
1335 pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1336 #[async_trait::async_trait]
1337 pub trait DynStateStoreExt: StaticSendSync {
1338 async fn try_wait_epoch(
1339 &self,
1340 epoch: HummockReadEpoch,
1341 options: TryWaitEpochOptions,
1342 ) -> StorageResult<()>;
1343
1344 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1345 async fn new_read_snapshot(
1346 &self,
1347 epoch: HummockReadEpoch,
1348 options: NewReadSnapshotOptions,
1349 ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1350 async fn new_vector_writer(
1351 &self,
1352 options: NewVectorWriterOptions,
1353 ) -> BoxDynStateStoreWriteVector;
1354 }
1355
1356 #[async_trait::async_trait]
1357 impl<S: StateStore> DynStateStoreExt for S {
1358 async fn try_wait_epoch(
1359 &self,
1360 epoch: HummockReadEpoch,
1361 options: TryWaitEpochOptions,
1362 ) -> StorageResult<()> {
1363 self.try_wait_epoch(epoch, options).await
1364 }
1365
1366 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1367 StateStorePointer(Box::new(self.new_local(option).await))
1368 }
1369
1370 async fn new_read_snapshot(
1371 &self,
1372 epoch: HummockReadEpoch,
1373 options: NewReadSnapshotOptions,
1374 ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1375 Ok(StateStorePointer(Arc::new(
1376 self.new_read_snapshot(epoch, options).await?,
1377 )))
1378 }
1379
1380 async fn new_vector_writer(
1381 &self,
1382 options: NewVectorWriterOptions,
1383 ) -> BoxDynStateStoreWriteVector {
1384 StateStorePointer(Box::new(self.new_vector_writer(options).await))
1385 }
1386 }
1387
1388 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1389
1390 macro_rules! state_store_pointer_dyn_as_ref {
1391 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1392 impl AsRef<dyn $target_dyn_trait>
1393 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1394 {
1395 fn as_ref(&self) -> &dyn $target_dyn_trait {
1396 (&*self.0) as _
1397 }
1398 }
1399 };
1400 }
1401
1402 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1403 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1404 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1405 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1406 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1407 state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1408
1409 macro_rules! state_store_pointer_dyn_as_mut {
1410 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1411 impl AsMut<dyn $target_dyn_trait>
1412 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1413 {
1414 fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1415 (&mut *self.0) as _
1416 }
1417 }
1418 };
1419 }
1420
1421 state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1422 state_store_pointer_dyn_as_mut!(
1423 Box<dyn DynStateStoreWriteVector>,
1424 DynStateStoreWriteEpochControl
1425 );
1426
1427 #[derive(Clone)]
1428 pub struct StateStorePointer<P>(pub(crate) P);
1429
1430 impl<P> StateStoreGet for StateStorePointer<P>
1431 where
1432 StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1433 {
1434 async fn on_key_value<'a, O: Send + 'a>(
1435 &'a self,
1436 key: TableKey<Bytes>,
1437 read_options: ReadOptions,
1438 on_key_value_fn: impl KeyValueFn<'a, O>,
1439 ) -> StorageResult<Option<O>> {
1440 let option = self.as_ref().get_keyed_row(key, read_options).await?;
1441 option
1442 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1443 .transpose()
1444 }
1445 }
1446
1447 impl<P> StateStoreRead for StateStorePointer<P>
1448 where
1449 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1450 {
1451 type Iter = BoxStateStoreReadIter;
1452 type RevIter = BoxStateStoreReadIter;
1453
1454 fn iter(
1455 &self,
1456 key_range: TableKeyRange,
1457
1458 read_options: ReadOptions,
1459 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1460 self.as_ref().iter(key_range, read_options)
1461 }
1462
1463 fn rev_iter(
1464 &self,
1465 key_range: TableKeyRange,
1466
1467 read_options: ReadOptions,
1468 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1469 self.as_ref().rev_iter(key_range, read_options)
1470 }
1471 }
1472
1473 impl StateStoreReadLog for StateStoreDynRef {
1474 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1475
1476 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1477 (*self.0).next_epoch(epoch, options).await
1478 }
1479
1480 fn iter_log(
1481 &self,
1482 epoch_range: (u64, u64),
1483 key_range: TableKeyRange,
1484 options: ReadLogOptions,
1485 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1486 (*self.0).iter_log(epoch_range, key_range, options)
1487 }
1488 }
1489
1490 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1491
1492 impl AsHummock for StateStoreDynRef {
1493 fn as_hummock(&self) -> Option<&HummockStorage> {
1494 (*self.0).as_hummock()
1495 }
1496 }
1497
1498 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1499
1500 impl StateStore for StateStoreDynRef {
1501 type Local = BoxDynLocalStateStore;
1502 type ReadSnapshot = StateStoreReadSnapshotDynRef;
1503 type VectorWriter = BoxDynStateStoreWriteVector;
1504
1505 fn try_wait_epoch(
1506 &self,
1507 epoch: HummockReadEpoch,
1508 options: TryWaitEpochOptions,
1509 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1510 (*self.0).try_wait_epoch(epoch, options)
1511 }
1512
1513 fn new_local(
1514 &self,
1515 option: NewLocalOptions,
1516 ) -> impl Future<Output = Self::Local> + Send + '_ {
1517 (*self.0).new_local(option)
1518 }
1519
1520 async fn new_read_snapshot(
1521 &self,
1522 epoch: HummockReadEpoch,
1523 options: NewReadSnapshotOptions,
1524 ) -> StorageResult<Self::ReadSnapshot> {
1525 (*self.0).new_read_snapshot(epoch, options).await
1526 }
1527
1528 fn new_vector_writer(
1529 &self,
1530 options: NewVectorWriterOptions,
1531 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1532 (*self.0).new_vector_writer(options)
1533 }
1534 }
1535}