1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22 BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23 HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45 Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46 SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52 ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57 Box::new(PrometheusMetricsRegistry::new(
58 GLOBAL_METRICS_REGISTRY.clone(),
59 ))
60});
61
62mod opaque_type {
63 use super::*;
64
65 pub type HummockStorageType = impl StateStore + AsHummock;
66 pub type MemoryStateStoreType = impl StateStore + AsHummock;
67 pub type SledStateStoreType = impl StateStore + AsHummock;
68
69 #[define_opaque(MemoryStateStoreType)]
70 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71 may_dynamic_dispatch(state_store)
72 }
73
74 #[define_opaque(HummockStorageType)]
75 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76 may_dynamic_dispatch(may_verify(state_store))
77 }
78
79 #[define_opaque(SledStateStoreType)]
80 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81 may_dynamic_dispatch(state_store)
82 }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94 state_store: S,
95 storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97 let inner = {
98 #[cfg(feature = "hm-trace")]
99 {
100 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101 }
102 #[cfg(not(feature = "hm-trace"))]
103 {
104 state_store
105 }
106 };
107 inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111 let inner = state_store.inner();
112 {
113 #[cfg(feature = "hm-trace")]
114 {
115 inner.inner()
116 }
117 #[cfg(not(feature = "hm-trace"))]
118 {
119 inner
120 }
121 }
122}
123
124#[derive(Clone, EnumAsInner)]
126#[allow(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128 HummockStateStore(Monitored<HummockStorageType>),
137 MemoryStateStore(Monitored<MemoryStateStoreType>),
142 SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146 #[cfg(not(debug_assertions))]
147 {
148 state_store
149 }
150 #[cfg(debug_assertions)]
151 {
152 use crate::store_impl::dyn_state_store::StateStorePointer;
153 StateStorePointer(Arc::new(state_store) as _)
154 }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158 #[cfg(not(debug_assertions))]
159 {
160 state_store
161 }
162 #[cfg(debug_assertions)]
163 {
164 use std::marker::PhantomData;
165
166 use risingwave_common::util::env_var::env_var_is_true;
167 use tracing::info;
168
169 use crate::store_impl::verify::VerifyStateStore;
170
171 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172 info!("enable verify state store");
173 Some(SledStateStore::new_temp())
174 } else {
175 info!("verify state store is not enabled");
176 None
177 };
178 VerifyStateStore {
179 actual: state_store,
180 expected,
181 _phantom: PhantomData::<()>,
182 }
183 }
184}
185
186impl StateStoreImpl {
187 fn in_memory(
188 state_store: MemoryStateStore,
189 storage_metrics: Arc<MonitoredStorageMetrics>,
190 ) -> Self {
191 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193 }
194
195 pub fn hummock(
196 state_store: HummockStorage,
197 storage_metrics: Arc<MonitoredStorageMetrics>,
198 ) -> Self {
199 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201 }
202
203 pub fn sled(
204 state_store: SledStateStore,
205 storage_metrics: Arc<MonitoredStorageMetrics>,
206 ) -> Self {
207 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208 }
209
210 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212 }
213
214 pub fn for_test() -> Self {
215 Self::in_memory(
216 MemoryStateStore::new(),
217 Arc::new(MonitoredStorageMetrics::unused()),
218 )
219 }
220
221 pub fn as_hummock(&self) -> Option<&HummockStorage> {
222 match self {
223 StateStoreImpl::HummockStateStore(hummock) => {
224 Some(inner(hummock).as_hummock().expect("should be hummock"))
225 }
226 _ => None,
227 }
228 }
229}
230
231impl Debug for StateStoreImpl {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 match self {
234 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237 }
238 }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243 ($impl:expr, $store:ident, $body:tt) => {{
244 use $crate::store_impl::StateStoreImpl;
245
246 match $impl {
247 StateStoreImpl::MemoryStateStore($store) => {
248 #[cfg(debug_assertions)]
251 {
252 $body
253 }
254 #[cfg(not(debug_assertions))]
255 {
256 let _store = $store;
257 unimplemented!("memory state store should never be used in release mode");
258 }
259 }
260
261 StateStoreImpl::SledStateStore($store) => {
262 #[cfg(debug_assertions)]
265 {
266 $body
267 }
268 #[cfg(not(debug_assertions))]
269 {
270 let _store = $store;
271 unimplemented!("sled state store should never be used in release mode");
272 }
273 }
274
275 StateStoreImpl::HummockStateStore($store) => $body,
276 }
277 }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282 use std::fmt::Debug;
283 use std::future::Future;
284 use std::marker::PhantomData;
285 use std::ops::Deref;
286 use std::sync::Arc;
287
288 use bytes::Bytes;
289 use risingwave_common::array::VectorRef;
290 use risingwave_common::bitmap::Bitmap;
291 use risingwave_common::hash::VirtualNode;
292 use risingwave_hummock_sdk::HummockReadEpoch;
293 use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
294 use tracing::log::warn;
295
296 use crate::error::StorageResult;
297 use crate::hummock::HummockStorage;
298 use crate::store::*;
299 use crate::store_impl::AsHummock;
300
301 #[expect(dead_code)]
302 fn assert_result_eq<Item: PartialEq + Debug, E>(
303 first: &std::result::Result<Item, E>,
304 second: &std::result::Result<Item, E>,
305 ) {
306 match (first, second) {
307 (Ok(first), Ok(second)) => {
308 if first != second {
309 warn!("result different: {:?} {:?}", first, second);
310 }
311 assert_eq!(first, second);
312 }
313 (Err(_), Err(_)) => {}
314 _ => {
315 warn!("one success and one failed");
316 panic!("result not equal");
317 }
318 }
319 }
320
321 #[derive(Clone)]
322 pub struct VerifyStateStore<A, E, T = ()> {
323 pub actual: A,
324 pub expected: Option<E>,
325 pub _phantom: PhantomData<T>,
326 }
327
328 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
329 fn as_hummock(&self) -> Option<&HummockStorage> {
330 self.actual.as_hummock()
331 }
332 }
333
334 impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
335 async fn on_key_value<'a, O: Send + 'a>(
336 &'a self,
337 key: TableKey<Bytes>,
338 read_options: ReadOptions,
339 on_key_value_fn: impl KeyValueFn<'a, O>,
340 ) -> StorageResult<Option<O>> {
341 let actual: Option<(FullKey<Bytes>, Bytes)> = self
342 .actual
343 .on_key_value(key.clone(), read_options.clone(), |key, value| {
344 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
345 })
346 .await?;
347 if let Some(expected) = &self.expected {
348 let expected: Option<(FullKey<Bytes>, Bytes)> = expected
349 .on_key_value(key, read_options, |key, value| {
350 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
351 })
352 .await?;
353 assert_eq!(
354 actual
355 .as_ref()
356 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
357 expected
358 .as_ref()
359 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
360 );
361 }
362
363 actual
364 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
365 .transpose()
366 }
367 }
368
369 impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
370 for VerifyStateStore<A, E>
371 {
372 fn nearest<'a, O: Send + 'a>(
373 &'a self,
374 vec: VectorRef<'a>,
375 options: VectorNearestOptions,
376 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
377 ) -> impl StorageFuture<'a, Vec<O>> {
378 self.actual.nearest(vec, options, on_nearest_item_fn)
379 }
380 }
381
382 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
383 type Iter = impl StateStoreReadIter;
384 type RevIter = impl StateStoreReadIter;
385
386 #[expect(clippy::manual_async_fn)]
389 fn iter(
390 &self,
391 key_range: TableKeyRange,
392
393 read_options: ReadOptions,
394 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
395 async move {
396 let actual = self
397 .actual
398 .iter(key_range.clone(), read_options.clone())
399 .await?;
400 let expected = if let Some(expected) = &self.expected {
401 Some(expected.iter(key_range, read_options).await?)
402 } else {
403 None
404 };
405
406 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
407 }
408 }
409
410 #[expect(clippy::manual_async_fn)]
411 fn rev_iter(
412 &self,
413 key_range: TableKeyRange,
414
415 read_options: ReadOptions,
416 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
417 async move {
418 let actual = self
419 .actual
420 .rev_iter(key_range.clone(), read_options.clone())
421 .await?;
422 let expected = if let Some(expected) = &self.expected {
423 Some(expected.rev_iter(key_range, read_options).await?)
424 } else {
425 None
426 };
427
428 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
429 }
430 }
431 }
432
433 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
434 type ChangeLogIter = impl StateStoreReadChangeLogIter;
435
436 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
437 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
438 if let Some(expected) = &self.expected {
439 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
440 }
441 Ok(actual)
442 }
443
444 async fn iter_log(
445 &self,
446 epoch_range: (u64, u64),
447 key_range: TableKeyRange,
448 options: ReadLogOptions,
449 ) -> StorageResult<Self::ChangeLogIter> {
450 let actual = self
451 .actual
452 .iter_log(epoch_range, key_range.clone(), options.clone())
453 .await?;
454 let expected = if let Some(expected) = &self.expected {
455 Some(expected.iter_log(epoch_range, key_range, options).await?)
456 } else {
457 None
458 };
459
460 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
461 }
462 }
463
464 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
465 for VerifyStateStore<A, E, T>
466 where
467 for<'a> T::ItemRef<'a>: PartialEq + Debug,
468 {
469 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
470 let actual = self.actual.try_next().await?;
471 if let Some(expected) = self.expected.as_mut() {
472 let expected = expected.try_next().await?;
473 assert_eq!(actual, expected);
474 }
475 Ok(actual)
476 }
477 }
478
479 fn verify_iter<T: IterItem>(
480 actual: impl StateStoreIter<T>,
481 expected: Option<impl StateStoreIter<T>>,
482 ) -> impl StateStoreIter<T>
483 where
484 for<'a> T::ItemRef<'a>: PartialEq + Debug,
485 {
486 VerifyStateStore {
487 actual,
488 expected,
489 _phantom: PhantomData::<T>,
490 }
491 }
492
493 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
494 type FlushedSnapshotReader =
495 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
496
497 type Iter<'a> = impl StateStoreIter + 'a;
498 type RevIter<'a> = impl StateStoreIter + 'a;
499
500 #[expect(clippy::manual_async_fn)]
501 fn iter(
502 &self,
503 key_range: TableKeyRange,
504 read_options: ReadOptions,
505 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
506 async move {
507 let actual = self
508 .actual
509 .iter(key_range.clone(), read_options.clone())
510 .await?;
511 let expected = if let Some(expected) = &self.expected {
512 Some(expected.iter(key_range, read_options).await?)
513 } else {
514 None
515 };
516
517 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
518 }
519 }
520
521 #[expect(clippy::manual_async_fn)]
522 fn rev_iter(
523 &self,
524 key_range: TableKeyRange,
525 read_options: ReadOptions,
526 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
527 async move {
528 let actual = self
529 .actual
530 .rev_iter(key_range.clone(), read_options.clone())
531 .await?;
532 let expected = if let Some(expected) = &self.expected {
533 Some(expected.rev_iter(key_range, read_options).await?)
534 } else {
535 None
536 };
537
538 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
539 }
540 }
541
542 fn insert(
543 &mut self,
544 key: TableKey<Bytes>,
545 new_val: Bytes,
546 old_val: Option<Bytes>,
547 ) -> StorageResult<()> {
548 if let Some(expected) = &mut self.expected {
549 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
550 }
551 self.actual.insert(key, new_val, old_val)?;
552
553 Ok(())
554 }
555
556 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
557 if let Some(expected) = &mut self.expected {
558 expected.delete(key.clone(), old_val.clone())?;
559 }
560 self.actual.delete(key, old_val)?;
561 Ok(())
562 }
563
564 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
565 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
566 if let Some(expected) = &mut self.expected {
567 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
568 }
569 Ok(ret)
570 }
571
572 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
573 let ret = self.actual.get_table_watermark(vnode);
574 if let Some(expected) = &self.expected {
575 assert_eq!(ret, expected.get_table_watermark(vnode));
576 }
577 ret
578 }
579
580 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
581 VerifyStateStore {
582 actual: self.actual.new_flushed_snapshot_reader(),
583 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
584 _phantom: Default::default(),
585 }
586 }
587 }
588
589 impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
590 for VerifyStateStore<A, E>
591 {
592 async fn flush(&mut self) -> StorageResult<usize> {
593 if let Some(expected) = &mut self.expected {
594 expected.flush().await?;
595 }
596 self.actual.flush().await
597 }
598
599 async fn try_flush(&mut self) -> StorageResult<()> {
600 if let Some(expected) = &mut self.expected {
601 expected.try_flush().await?;
602 }
603 self.actual.try_flush().await
604 }
605
606 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
607 self.actual.init(options.clone()).await?;
608 if let Some(expected) = &mut self.expected {
609 expected.init(options).await?;
610 }
611 Ok(())
612 }
613
614 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
615 if let Some(expected) = &mut self.expected {
616 expected.seal_current_epoch(next_epoch, opts.clone());
617 }
618 self.actual.seal_current_epoch(next_epoch, opts);
619 }
620 }
621
622 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
623 type Local = VerifyStateStore<A::Local, E::Local>;
624 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
625 type VectorWriter = A::VectorWriter;
626
627 fn try_wait_epoch(
628 &self,
629 epoch: HummockReadEpoch,
630 options: TryWaitEpochOptions,
631 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
632 self.actual.try_wait_epoch(epoch, options)
633 }
634
635 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
636 let expected = if let Some(expected) = &self.expected {
637 Some(expected.new_local(option.clone()).await)
638 } else {
639 None
640 };
641 VerifyStateStore {
642 actual: self.actual.new_local(option).await,
643 expected,
644 _phantom: PhantomData::<()>,
645 }
646 }
647
648 async fn new_read_snapshot(
649 &self,
650 epoch: HummockReadEpoch,
651 options: NewReadSnapshotOptions,
652 ) -> StorageResult<Self::ReadSnapshot> {
653 let expected = if let Some(expected) = &self.expected {
654 Some(expected.new_read_snapshot(epoch, options).await?)
655 } else {
656 None
657 };
658 Ok(VerifyStateStore {
659 actual: self.actual.new_read_snapshot(epoch, options).await?,
660 expected,
661 _phantom: PhantomData::<()>,
662 })
663 }
664
665 fn new_vector_writer(
666 &self,
667 options: NewVectorWriterOptions,
668 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
669 self.actual.new_vector_writer(options)
670 }
671 }
672
673 impl<A, E> Deref for VerifyStateStore<A, E> {
674 type Target = A;
675
676 fn deref(&self) -> &Self::Target {
677 &self.actual
678 }
679 }
680}
681
682impl StateStoreImpl {
683 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
684 #[allow(clippy::too_many_arguments)]
685 #[expect(clippy::borrowed_box)]
686 pub async fn new(
687 s: &str,
688 opts: Arc<StorageOpts>,
689 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
690 state_store_metrics: Arc<HummockStateStoreMetrics>,
691 object_store_metrics: Arc<ObjectStoreMetrics>,
692 storage_metrics: Arc<MonitoredStorageMetrics>,
693 compactor_metrics: Arc<CompactorMetrics>,
694 await_tree_config: Option<await_tree::Config>,
695 use_new_object_prefix_strategy: bool,
696 ) -> StorageResult<Self> {
697 const KB: usize = 1 << 10;
698 const MB: usize = 1 << 20;
699
700 let meta_cache = {
701 let mut builder = HybridCacheBuilder::new()
702 .with_name("foyer.meta")
703 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
704 .memory(opts.meta_cache_capacity_mb * MB)
705 .with_shards(opts.meta_cache_shard_num)
706 .with_eviction_config(opts.meta_cache_eviction_config.clone())
707 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
708 u64::BITS as usize / 8 + value.estimate_size()
709 })
710 .storage();
711
712 if !opts.meta_file_cache_dir.is_empty() {
713 if let Err(e) = Feature::ElasticDiskCache.check_available() {
714 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
715 } else {
716 let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
717 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
718 .with_throttle(opts.meta_file_cache_throttle.clone())
719 .build()
720 .map_err(HummockError::foyer_error)?;
721 let engine_builder = BlockEngineBuilder::new(device)
722 .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
723 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
724 .with_flushers(opts.meta_file_cache_flushers)
725 .with_reclaimers(opts.meta_file_cache_reclaimers)
726 .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
728 opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
729 )
730 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
731 .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
732 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
733 opts.meta_file_cache_fifo_probation_ratio,
734 ))]);
735 builder = builder
736 .with_engine_config(engine_builder)
737 .with_recover_mode(opts.meta_file_cache_recover_mode)
738 .with_compression(opts.meta_file_cache_compression)
739 .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
740 }
741 }
742
743 builder.build().await.map_err(HummockError::foyer_error)?
744 };
745
746 let block_cache = {
747 let mut builder = HybridCacheBuilder::new()
748 .with_name("foyer.data")
749 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
750 .with_event_listener(Arc::new(BlockCacheEventListener::new(
751 state_store_metrics.clone(),
752 )))
753 .memory(opts.block_cache_capacity_mb * MB)
754 .with_shards(opts.block_cache_shard_num)
755 .with_eviction_config(opts.block_cache_eviction_config.clone())
756 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
757 u64::BITS as usize * 2 / 8 + value.raw().len()
759 })
760 .storage();
761
762 if !opts.data_file_cache_dir.is_empty() {
763 if let Err(e) = Feature::ElasticDiskCache.check_available() {
764 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
765 } else {
766 let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
767 .with_capacity(opts.data_file_cache_capacity_mb * MB)
768 .with_throttle(opts.data_file_cache_throttle.clone())
769 .build()
770 .map_err(HummockError::foyer_error)?;
771 let engine_builder = BlockEngineBuilder::new(device)
772 .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
773 .with_indexer_shards(opts.data_file_cache_indexer_shards)
774 .with_flushers(opts.data_file_cache_flushers)
775 .with_reclaimers(opts.data_file_cache_reclaimers)
776 .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
778 opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
779 )
780 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
781 .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
782 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
783 opts.data_file_cache_fifo_probation_ratio,
784 ))]);
785 builder = builder
786 .with_engine_config(engine_builder)
787 .with_recover_mode(opts.data_file_cache_recover_mode)
788 .with_compression(opts.data_file_cache_compression)
789 .with_runtime_options(opts.data_file_cache_runtime_config.clone());
790 }
791 }
792
793 builder.build().await.map_err(HummockError::foyer_error)?
794 };
795
796 let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
797 .with_shards(opts.vector_meta_cache_shard_num)
798 .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
799 .build();
800
801 let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
802 .with_shards(opts.vector_block_cache_shard_num)
803 .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
804 .build();
805
806 let recent_filter = if opts.data_file_cache_dir.is_empty() {
807 Arc::new(NoneRecentFilter::default().into())
808 } else if opts.cache_refill_recent_filter_shards == 1 {
809 Arc::new(
810 SimpleRecentFilter::new(
811 opts.cache_refill_recent_filter_layers,
812 Duration::from_millis(
813 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
814 ),
815 )
816 .into(),
817 )
818 } else if opts.cache_refill_skip_recent_filter {
819 Arc::new(AllRecentFilter::default().into())
820 } else {
821 Arc::new(
822 ShardedRecentFilter::new(
823 opts.cache_refill_recent_filter_layers,
824 Duration::from_millis(
825 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
826 ),
827 opts.cache_refill_recent_filter_shards,
828 )
829 .into(),
830 )
831 };
832
833 let store = match s {
834 hummock if hummock.starts_with("hummock+") => {
835 let object_store = build_remote_object_store(
836 hummock.strip_prefix("hummock+").unwrap(),
837 object_store_metrics.clone(),
838 "Hummock",
839 Arc::new(opts.object_store_config.clone()),
840 )
841 .await;
842
843 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
844 store: Arc::new(object_store),
845 path: opts.data_directory.clone(),
846 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
847 max_prefetch_block_number: opts.max_prefetch_block_number,
848 recent_filter,
849 state_store_metrics: state_store_metrics.clone(),
850 use_new_object_prefix_strategy,
851 skip_bloom_filter_in_serde: opts.sst_skip_bloom_filter_in_serde,
852
853 meta_cache,
854 block_cache,
855 vector_meta_cache,
856 vector_block_cache,
857 }));
858 let notification_client =
859 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
860 let compaction_catalog_manager_ref =
861 Arc::new(CompactionCatalogManager::new(Box::new(
862 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
863 )));
864
865 let inner = HummockStorage::new(
866 opts.clone(),
867 sstable_store,
868 hummock_meta_client.clone(),
869 notification_client,
870 compaction_catalog_manager_ref,
871 state_store_metrics.clone(),
872 compactor_metrics.clone(),
873 await_tree_config,
874 )
875 .await?;
876
877 StateStoreImpl::hummock(inner, storage_metrics)
878 }
879
880 "in_memory" | "in-memory" => {
881 tracing::warn!(
882 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
883 );
884 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
885 }
886
887 sled if sled.starts_with("sled://") => {
888 tracing::warn!(
889 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
890 );
891 let path = sled.strip_prefix("sled://").unwrap();
892 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
893 }
894
895 other => unimplemented!("{} state store is not supported", other),
896 };
897
898 Ok(store)
899 }
900}
901
902pub trait AsHummock: Send + Sync {
903 fn as_hummock(&self) -> Option<&HummockStorage>;
904
905 fn sync(
906 &self,
907 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
908 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
909 async move {
910 if let Some(hummock) = self.as_hummock() {
911 hummock.sync(sync_table_epochs).await
912 } else {
913 Ok(SyncResult::default())
914 }
915 }
916 .boxed()
917 }
918}
919
920impl AsHummock for HummockStorage {
921 fn as_hummock(&self) -> Option<&HummockStorage> {
922 Some(self)
923 }
924}
925
926impl AsHummock for MemoryStateStore {
927 fn as_hummock(&self) -> Option<&HummockStorage> {
928 None
929 }
930}
931
932impl AsHummock for SledStateStore {
933 fn as_hummock(&self) -> Option<&HummockStorage> {
934 None
935 }
936}
937
938#[cfg(debug_assertions)]
939mod dyn_state_store {
940 use std::future::Future;
941 use std::ops::DerefMut;
942 use std::sync::Arc;
943
944 use bytes::Bytes;
945 use risingwave_common::array::VectorRef;
946 use risingwave_common::bitmap::Bitmap;
947 use risingwave_common::hash::VirtualNode;
948 use risingwave_hummock_sdk::HummockReadEpoch;
949 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
950
951 use crate::error::StorageResult;
952 use crate::hummock::HummockStorage;
953 use crate::store::*;
954 use crate::store_impl::AsHummock;
955 use crate::vector::VectorDistance;
956
957 #[async_trait::async_trait]
958 pub trait DynStateStoreIter<T: IterItem>: Send {
959 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
960 }
961
962 #[async_trait::async_trait]
963 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
964 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
965 self.try_next().await
966 }
967 }
968
969 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
970 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
971 fn try_next(
972 &mut self,
973 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
974 self.deref_mut().try_next()
975 }
976 }
977
978 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
981 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
982
983 #[async_trait::async_trait]
984 pub trait DynStateStoreGet: StaticSendSync {
985 async fn get_keyed_row(
986 &self,
987 key: TableKey<Bytes>,
988 read_options: ReadOptions,
989 ) -> StorageResult<Option<StateStoreKeyedRow>>;
990 }
991
992 #[async_trait::async_trait]
993 pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
994 async fn iter(
995 &self,
996 key_range: TableKeyRange,
997
998 read_options: ReadOptions,
999 ) -> StorageResult<BoxStateStoreReadIter>;
1000
1001 async fn rev_iter(
1002 &self,
1003 key_range: TableKeyRange,
1004
1005 read_options: ReadOptions,
1006 ) -> StorageResult<BoxStateStoreReadIter>;
1007 }
1008
1009 #[async_trait::async_trait]
1010 pub trait DynStateStoreReadLog: StaticSendSync {
1011 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1012 async fn iter_log(
1013 &self,
1014 epoch_range: (u64, u64),
1015 key_range: TableKeyRange,
1016 options: ReadLogOptions,
1017 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1018 }
1019
1020 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1021
1022 #[async_trait::async_trait]
1023 impl<S: StateStoreGet> DynStateStoreGet for S {
1024 async fn get_keyed_row(
1025 &self,
1026 key: TableKey<Bytes>,
1027 read_options: ReadOptions,
1028 ) -> StorageResult<Option<StateStoreKeyedRow>> {
1029 self.on_key_value(key, read_options, move |key, value| {
1030 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1031 })
1032 .await
1033 }
1034 }
1035
1036 #[async_trait::async_trait]
1037 impl<S: StateStoreRead> DynStateStoreRead for S {
1038 async fn iter(
1039 &self,
1040 key_range: TableKeyRange,
1041
1042 read_options: ReadOptions,
1043 ) -> StorageResult<BoxStateStoreReadIter> {
1044 Ok(Box::new(self.iter(key_range, read_options).await?))
1045 }
1046
1047 async fn rev_iter(
1048 &self,
1049 key_range: TableKeyRange,
1050
1051 read_options: ReadOptions,
1052 ) -> StorageResult<BoxStateStoreReadIter> {
1053 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1054 }
1055 }
1056
1057 #[async_trait::async_trait]
1058 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1059 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1060 self.next_epoch(epoch, options).await
1061 }
1062
1063 async fn iter_log(
1064 &self,
1065 epoch_range: (u64, u64),
1066 key_range: TableKeyRange,
1067 options: ReadLogOptions,
1068 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1069 Ok(Box::new(
1070 self.iter_log(epoch_range, key_range, options).await?,
1071 ))
1072 }
1073 }
1074
1075 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1077 #[async_trait::async_trait]
1078 pub trait DynLocalStateStore:
1079 DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1080 {
1081 async fn iter(
1082 &self,
1083 key_range: TableKeyRange,
1084 read_options: ReadOptions,
1085 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1086
1087 async fn rev_iter(
1088 &self,
1089 key_range: TableKeyRange,
1090 read_options: ReadOptions,
1091 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1092
1093 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1094
1095 fn insert(
1096 &mut self,
1097 key: TableKey<Bytes>,
1098 new_val: Bytes,
1099 old_val: Option<Bytes>,
1100 ) -> StorageResult<()>;
1101
1102 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1103
1104 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1105
1106 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1107 }
1108
1109 #[async_trait::async_trait]
1110 pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1111 async fn flush(&mut self) -> StorageResult<usize>;
1112
1113 async fn try_flush(&mut self) -> StorageResult<()>;
1114
1115 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1116
1117 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1118 }
1119
1120 #[async_trait::async_trait]
1121 impl<S: LocalStateStore> DynLocalStateStore for S {
1122 async fn iter(
1123 &self,
1124 key_range: TableKeyRange,
1125 read_options: ReadOptions,
1126 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1127 Ok(Box::new(self.iter(key_range, read_options).await?))
1128 }
1129
1130 async fn rev_iter(
1131 &self,
1132 key_range: TableKeyRange,
1133 read_options: ReadOptions,
1134 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1135 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1136 }
1137
1138 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1139 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1140 }
1141
1142 fn insert(
1143 &mut self,
1144 key: TableKey<Bytes>,
1145 new_val: Bytes,
1146 old_val: Option<Bytes>,
1147 ) -> StorageResult<()> {
1148 self.insert(key, new_val, old_val)
1149 }
1150
1151 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1152 self.delete(key, old_val)
1153 }
1154
1155 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1156 self.update_vnode_bitmap(vnodes).await
1157 }
1158
1159 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1160 self.get_table_watermark(vnode)
1161 }
1162 }
1163
1164 #[async_trait::async_trait]
1165 impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1166 async fn flush(&mut self) -> StorageResult<usize> {
1167 self.flush().await
1168 }
1169
1170 async fn try_flush(&mut self) -> StorageResult<()> {
1171 self.try_flush().await
1172 }
1173
1174 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1175 self.init(options).await
1176 }
1177
1178 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1179 self.seal_current_epoch(next_epoch, opts)
1180 }
1181 }
1182
1183 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1184
1185 impl LocalStateStore for BoxDynLocalStateStore {
1186 type FlushedSnapshotReader = StateStoreReadDynRef;
1187 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1188 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1189
1190 fn iter(
1191 &self,
1192 key_range: TableKeyRange,
1193 read_options: ReadOptions,
1194 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1195 (*self.0).iter(key_range, read_options)
1196 }
1197
1198 fn rev_iter(
1199 &self,
1200 key_range: TableKeyRange,
1201 read_options: ReadOptions,
1202 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1203 (*self.0).rev_iter(key_range, read_options)
1204 }
1205
1206 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1207 (*self.0).new_flushed_snapshot_reader()
1208 }
1209
1210 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1211 (*self.0).get_table_watermark(vnode)
1212 }
1213
1214 fn insert(
1215 &mut self,
1216 key: TableKey<Bytes>,
1217 new_val: Bytes,
1218 old_val: Option<Bytes>,
1219 ) -> StorageResult<()> {
1220 (*self.0).insert(key, new_val, old_val)
1221 }
1222
1223 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1224 (*self.0).delete(key, old_val)
1225 }
1226
1227 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1228 (*self.0).update_vnode_bitmap(vnodes).await
1229 }
1230 }
1231
1232 impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1233 where
1234 StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1235 {
1236 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1237 self.as_mut().flush()
1238 }
1239
1240 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1241 self.as_mut().try_flush()
1242 }
1243
1244 fn init(
1245 &mut self,
1246 options: InitOptions,
1247 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1248 self.as_mut().init(options)
1249 }
1250
1251 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1252 self.as_mut().seal_current_epoch(next_epoch, opts)
1253 }
1254 }
1255
1256 #[async_trait::async_trait]
1257 pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1258 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()>;
1259 }
1260
1261 #[async_trait::async_trait]
1262 impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1263 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1264 self.insert(vec, info)
1265 }
1266 }
1267
1268 pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1269
1270 impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1271 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1272 self.0.insert(vec, info)
1273 }
1274 }
1275
1276 #[async_trait::async_trait]
1279 pub trait DynStateStoreReadVector: StaticSendSync {
1280 async fn nearest(
1281 &self,
1282 vec: VectorRef<'_>,
1283 options: VectorNearestOptions,
1284 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1285 }
1286
1287 #[async_trait::async_trait]
1288 impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1289 async fn nearest(
1290 &self,
1291 vec: VectorRef<'_>,
1292 options: VectorNearestOptions,
1293 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1294 use risingwave_common::types::ScalarRef;
1295 self.nearest(vec, options, |vec, distance, info| {
1296 (
1297 vec.to_owned_scalar(),
1298 distance,
1299 Bytes::copy_from_slice(info),
1300 )
1301 })
1302 .await
1303 }
1304 }
1305
1306 impl<P> StateStoreReadVector for StateStorePointer<P>
1307 where
1308 StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1309 {
1310 async fn nearest<'a, O: Send + 'a>(
1311 &'a self,
1312 vec: VectorRef<'a>,
1313 options: VectorNearestOptions,
1314 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
1315 ) -> StorageResult<Vec<O>> {
1316 let output = self.as_ref().nearest(vec, options).await?;
1317 Ok(output
1318 .into_iter()
1319 .map(|(vec, distance, info)| {
1320 on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1321 })
1322 .collect())
1323 }
1324 }
1325
1326 pub trait DynStateStoreReadSnapshot:
1327 DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1328 {
1329 }
1330
1331 impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1332 for S
1333 {
1334 }
1335
1336 pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1337 #[async_trait::async_trait]
1338 pub trait DynStateStoreExt: StaticSendSync {
1339 async fn try_wait_epoch(
1340 &self,
1341 epoch: HummockReadEpoch,
1342 options: TryWaitEpochOptions,
1343 ) -> StorageResult<()>;
1344
1345 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1346 async fn new_read_snapshot(
1347 &self,
1348 epoch: HummockReadEpoch,
1349 options: NewReadSnapshotOptions,
1350 ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1351 async fn new_vector_writer(
1352 &self,
1353 options: NewVectorWriterOptions,
1354 ) -> BoxDynStateStoreWriteVector;
1355 }
1356
1357 #[async_trait::async_trait]
1358 impl<S: StateStore> DynStateStoreExt for S {
1359 async fn try_wait_epoch(
1360 &self,
1361 epoch: HummockReadEpoch,
1362 options: TryWaitEpochOptions,
1363 ) -> StorageResult<()> {
1364 self.try_wait_epoch(epoch, options).await
1365 }
1366
1367 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1368 StateStorePointer(Box::new(self.new_local(option).await))
1369 }
1370
1371 async fn new_read_snapshot(
1372 &self,
1373 epoch: HummockReadEpoch,
1374 options: NewReadSnapshotOptions,
1375 ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1376 Ok(StateStorePointer(Arc::new(
1377 self.new_read_snapshot(epoch, options).await?,
1378 )))
1379 }
1380
1381 async fn new_vector_writer(
1382 &self,
1383 options: NewVectorWriterOptions,
1384 ) -> BoxDynStateStoreWriteVector {
1385 StateStorePointer(Box::new(self.new_vector_writer(options).await))
1386 }
1387 }
1388
1389 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1390
1391 macro_rules! state_store_pointer_dyn_as_ref {
1392 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1393 impl AsRef<dyn $target_dyn_trait>
1394 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1395 {
1396 fn as_ref(&self) -> &dyn $target_dyn_trait {
1397 (&*self.0) as _
1398 }
1399 }
1400 };
1401 }
1402
1403 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1404 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1405 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1406 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1407 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1408 state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1409
1410 macro_rules! state_store_pointer_dyn_as_mut {
1411 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1412 impl AsMut<dyn $target_dyn_trait>
1413 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1414 {
1415 fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1416 (&mut *self.0) as _
1417 }
1418 }
1419 };
1420 }
1421
1422 state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1423 state_store_pointer_dyn_as_mut!(
1424 Box<dyn DynStateStoreWriteVector>,
1425 DynStateStoreWriteEpochControl
1426 );
1427
1428 #[derive(Clone)]
1429 pub struct StateStorePointer<P>(pub(crate) P);
1430
1431 impl<P> StateStoreGet for StateStorePointer<P>
1432 where
1433 StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1434 {
1435 async fn on_key_value<'a, O: Send + 'a>(
1436 &'a self,
1437 key: TableKey<Bytes>,
1438 read_options: ReadOptions,
1439 on_key_value_fn: impl KeyValueFn<'a, O>,
1440 ) -> StorageResult<Option<O>> {
1441 let option = self.as_ref().get_keyed_row(key, read_options).await?;
1442 option
1443 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1444 .transpose()
1445 }
1446 }
1447
1448 impl<P> StateStoreRead for StateStorePointer<P>
1449 where
1450 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1451 {
1452 type Iter = BoxStateStoreReadIter;
1453 type RevIter = BoxStateStoreReadIter;
1454
1455 fn iter(
1456 &self,
1457 key_range: TableKeyRange,
1458
1459 read_options: ReadOptions,
1460 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1461 self.as_ref().iter(key_range, read_options)
1462 }
1463
1464 fn rev_iter(
1465 &self,
1466 key_range: TableKeyRange,
1467
1468 read_options: ReadOptions,
1469 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1470 self.as_ref().rev_iter(key_range, read_options)
1471 }
1472 }
1473
1474 impl StateStoreReadLog for StateStoreDynRef {
1475 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1476
1477 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1478 (*self.0).next_epoch(epoch, options).await
1479 }
1480
1481 fn iter_log(
1482 &self,
1483 epoch_range: (u64, u64),
1484 key_range: TableKeyRange,
1485 options: ReadLogOptions,
1486 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1487 (*self.0).iter_log(epoch_range, key_range, options)
1488 }
1489 }
1490
1491 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1492
1493 impl AsHummock for StateStoreDynRef {
1494 fn as_hummock(&self) -> Option<&HummockStorage> {
1495 (*self.0).as_hummock()
1496 }
1497 }
1498
1499 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1500
1501 impl StateStore for StateStoreDynRef {
1502 type Local = BoxDynLocalStateStore;
1503 type ReadSnapshot = StateStoreReadSnapshotDynRef;
1504 type VectorWriter = BoxDynStateStoreWriteVector;
1505
1506 fn try_wait_epoch(
1507 &self,
1508 epoch: HummockReadEpoch,
1509 options: TryWaitEpochOptions,
1510 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1511 (*self.0).try_wait_epoch(epoch, options)
1512 }
1513
1514 fn new_local(
1515 &self,
1516 option: NewLocalOptions,
1517 ) -> impl Future<Output = Self::Local> + Send + '_ {
1518 (*self.0).new_local(option)
1519 }
1520
1521 async fn new_read_snapshot(
1522 &self,
1523 epoch: HummockReadEpoch,
1524 options: NewReadSnapshotOptions,
1525 ) -> StorageResult<Self::ReadSnapshot> {
1526 (*self.0).new_read_snapshot(epoch, options).await
1527 }
1528
1529 fn new_vector_writer(
1530 &self,
1531 options: NewVectorWriterOptions,
1532 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1533 (*self.0).new_vector_writer(options)
1534 }
1535 }
1536}