1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22 BlockEngineBuilder, CacheBuilder, DeviceBuilder, FifoPicker, FsDeviceBuilder,
23 HybridCacheBuilder,
24};
25use futures::FutureExt;
26use futures::future::BoxFuture;
27use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
28use risingwave_common::catalog::TableId;
29use risingwave_common::license::Feature;
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common_service::RpcNotificationClient;
32use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
33use risingwave_object_store::object::build_remote_object_store;
34use thiserror_ext::AsReport;
35
36use crate::StateStore;
37use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
38use crate::error::StorageResult;
39use crate::hummock::all::AllRecentFilter;
40use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
41use crate::hummock::none::NoneRecentFilter;
42use crate::hummock::sharded::ShardedRecentFilter;
43use crate::hummock::simple::SimpleRecentFilter;
44use crate::hummock::{
45 Block, BlockCacheEventListener, HummockError, HummockStorage, Sstable, SstableBlockIndex,
46 SstableStore, SstableStoreConfig,
47};
48use crate::memory::MemoryStateStore;
49use crate::memory::sled::SledStateStore;
50use crate::monitor::{
51 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
52 ObjectStoreMetrics,
53};
54use crate::opts::StorageOpts;
55
56static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
57 Box::new(PrometheusMetricsRegistry::new(
58 GLOBAL_METRICS_REGISTRY.clone(),
59 ))
60});
61
62mod opaque_type {
63 use super::*;
64
65 pub type HummockStorageType = impl StateStore + AsHummock;
66 pub type MemoryStateStoreType = impl StateStore + AsHummock;
67 pub type SledStateStoreType = impl StateStore + AsHummock;
68
69 #[define_opaque(MemoryStateStoreType)]
70 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
71 may_dynamic_dispatch(state_store)
72 }
73
74 #[define_opaque(HummockStorageType)]
75 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
76 may_dynamic_dispatch(may_verify(state_store))
77 }
78
79 #[define_opaque(SledStateStoreType)]
80 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
81 may_dynamic_dispatch(state_store)
82 }
83}
84pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
85use opaque_type::{hummock, in_memory, sled};
86
87#[cfg(feature = "hm-trace")]
88type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
89
90#[cfg(not(feature = "hm-trace"))]
91type Monitored<S> = MonitoredStateStore<S>;
92
93fn monitored<S: StateStore>(
94 state_store: S,
95 storage_metrics: Arc<MonitoredStorageMetrics>,
96) -> Monitored<S> {
97 let inner = {
98 #[cfg(feature = "hm-trace")]
99 {
100 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
101 }
102 #[cfg(not(feature = "hm-trace"))]
103 {
104 state_store
105 }
106 };
107 inner.monitored(storage_metrics)
108}
109
110fn inner<S>(state_store: &Monitored<S>) -> &S {
111 let inner = state_store.inner();
112 {
113 #[cfg(feature = "hm-trace")]
114 {
115 inner.inner()
116 }
117 #[cfg(not(feature = "hm-trace"))]
118 {
119 inner
120 }
121 }
122}
123
124#[derive(Clone, EnumAsInner)]
126#[allow(clippy::enum_variant_names)]
127pub enum StateStoreImpl {
128 HummockStateStore(Monitored<HummockStorageType>),
137 MemoryStateStore(Monitored<MemoryStateStoreType>),
142 SledStateStore(Monitored<SledStateStoreType>),
143}
144
145fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
146 #[cfg(not(debug_assertions))]
147 {
148 state_store
149 }
150 #[cfg(debug_assertions)]
151 {
152 use crate::store_impl::dyn_state_store::StateStorePointer;
153 StateStorePointer(Arc::new(state_store) as _)
154 }
155}
156
157fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
158 #[cfg(not(debug_assertions))]
159 {
160 state_store
161 }
162 #[cfg(debug_assertions)]
163 {
164 use std::marker::PhantomData;
165
166 use risingwave_common::util::env_var::env_var_is_true;
167 use tracing::info;
168
169 use crate::store_impl::verify::VerifyStateStore;
170
171 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
172 info!("enable verify state store");
173 Some(SledStateStore::new_temp())
174 } else {
175 info!("verify state store is not enabled");
176 None
177 };
178 VerifyStateStore {
179 actual: state_store,
180 expected,
181 _phantom: PhantomData::<()>,
182 }
183 }
184}
185
186impl StateStoreImpl {
187 fn in_memory(
188 state_store: MemoryStateStore,
189 storage_metrics: Arc<MonitoredStorageMetrics>,
190 ) -> Self {
191 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
193 }
194
195 pub fn hummock(
196 state_store: HummockStorage,
197 storage_metrics: Arc<MonitoredStorageMetrics>,
198 ) -> Self {
199 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
201 }
202
203 pub fn sled(
204 state_store: SledStateStore,
205 storage_metrics: Arc<MonitoredStorageMetrics>,
206 ) -> Self {
207 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
208 }
209
210 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
211 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
212 }
213
214 pub fn for_test() -> Self {
215 Self::in_memory(
216 MemoryStateStore::new(),
217 Arc::new(MonitoredStorageMetrics::unused()),
218 )
219 }
220
221 pub fn as_hummock(&self) -> Option<&HummockStorage> {
222 match self {
223 StateStoreImpl::HummockStateStore(hummock) => {
224 Some(inner(hummock).as_hummock().expect("should be hummock"))
225 }
226 _ => None,
227 }
228 }
229}
230
231impl Debug for StateStoreImpl {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 match self {
234 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
235 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
236 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
237 }
238 }
239}
240
241#[macro_export]
242macro_rules! dispatch_state_store {
243 ($impl:expr, $store:ident, $body:tt) => {{
244 use $crate::store_impl::StateStoreImpl;
245
246 match $impl {
247 StateStoreImpl::MemoryStateStore($store) => {
248 #[cfg(debug_assertions)]
251 {
252 $body
253 }
254 #[cfg(not(debug_assertions))]
255 {
256 let _store = $store;
257 unimplemented!("memory state store should never be used in release mode");
258 }
259 }
260
261 StateStoreImpl::SledStateStore($store) => {
262 #[cfg(debug_assertions)]
265 {
266 $body
267 }
268 #[cfg(not(debug_assertions))]
269 {
270 let _store = $store;
271 unimplemented!("sled state store should never be used in release mode");
272 }
273 }
274
275 StateStoreImpl::HummockStateStore($store) => $body,
276 }
277 }};
278}
279
280#[cfg(any(debug_assertions, test, feature = "test"))]
281pub mod verify {
282 use std::fmt::Debug;
283 use std::future::Future;
284 use std::marker::PhantomData;
285 use std::ops::Deref;
286 use std::sync::Arc;
287
288 use bytes::Bytes;
289 use risingwave_common::bitmap::Bitmap;
290 use risingwave_common::hash::VirtualNode;
291 use risingwave_hummock_sdk::HummockReadEpoch;
292 use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
293 use tracing::log::warn;
294
295 use crate::error::StorageResult;
296 use crate::hummock::HummockStorage;
297 use crate::store::*;
298 use crate::store_impl::AsHummock;
299
300 #[expect(dead_code)]
301 fn assert_result_eq<Item: PartialEq + Debug, E>(
302 first: &std::result::Result<Item, E>,
303 second: &std::result::Result<Item, E>,
304 ) {
305 match (first, second) {
306 (Ok(first), Ok(second)) => {
307 if first != second {
308 warn!("result different: {:?} {:?}", first, second);
309 }
310 assert_eq!(first, second);
311 }
312 (Err(_), Err(_)) => {}
313 _ => {
314 warn!("one success and one failed");
315 panic!("result not equal");
316 }
317 }
318 }
319
320 #[derive(Clone)]
321 pub struct VerifyStateStore<A, E, T = ()> {
322 pub actual: A,
323 pub expected: Option<E>,
324 pub _phantom: PhantomData<T>,
325 }
326
327 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
328 fn as_hummock(&self) -> Option<&HummockStorage> {
329 self.actual.as_hummock()
330 }
331 }
332
333 impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
334 async fn on_key_value<O: Send + 'static>(
335 &self,
336 key: TableKey<Bytes>,
337 read_options: ReadOptions,
338 on_key_value_fn: impl KeyValueFn<O>,
339 ) -> StorageResult<Option<O>> {
340 let actual: Option<(FullKey<Bytes>, Bytes)> = self
341 .actual
342 .on_key_value(key.clone(), read_options.clone(), |key, value| {
343 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
344 })
345 .await?;
346 if let Some(expected) = &self.expected {
347 let expected: Option<(FullKey<Bytes>, Bytes)> = expected
348 .on_key_value(key, read_options, |key, value| {
349 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
350 })
351 .await?;
352 assert_eq!(
353 actual
354 .as_ref()
355 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
356 expected
357 .as_ref()
358 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
359 );
360 }
361
362 actual
363 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
364 .transpose()
365 }
366 }
367
368 impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
369 for VerifyStateStore<A, E>
370 {
371 fn nearest<O: Send + 'static>(
372 &self,
373 vec: Vector,
374 options: VectorNearestOptions,
375 on_nearest_item_fn: impl OnNearestItemFn<O>,
376 ) -> impl StorageFuture<'_, Vec<O>> {
377 self.actual.nearest(vec, options, on_nearest_item_fn)
378 }
379 }
380
381 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
382 type Iter = impl StateStoreReadIter;
383 type RevIter = impl StateStoreReadIter;
384
385 #[expect(clippy::manual_async_fn)]
388 fn iter(
389 &self,
390 key_range: TableKeyRange,
391
392 read_options: ReadOptions,
393 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
394 async move {
395 let actual = self
396 .actual
397 .iter(key_range.clone(), read_options.clone())
398 .await?;
399 let expected = if let Some(expected) = &self.expected {
400 Some(expected.iter(key_range, read_options).await?)
401 } else {
402 None
403 };
404
405 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
406 }
407 }
408
409 #[expect(clippy::manual_async_fn)]
410 fn rev_iter(
411 &self,
412 key_range: TableKeyRange,
413
414 read_options: ReadOptions,
415 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
416 async move {
417 let actual = self
418 .actual
419 .rev_iter(key_range.clone(), read_options.clone())
420 .await?;
421 let expected = if let Some(expected) = &self.expected {
422 Some(expected.rev_iter(key_range, read_options).await?)
423 } else {
424 None
425 };
426
427 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
428 }
429 }
430 }
431
432 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
433 type ChangeLogIter = impl StateStoreReadChangeLogIter;
434
435 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
436 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
437 if let Some(expected) = &self.expected {
438 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
439 }
440 Ok(actual)
441 }
442
443 async fn iter_log(
444 &self,
445 epoch_range: (u64, u64),
446 key_range: TableKeyRange,
447 options: ReadLogOptions,
448 ) -> StorageResult<Self::ChangeLogIter> {
449 let actual = self
450 .actual
451 .iter_log(epoch_range, key_range.clone(), options.clone())
452 .await?;
453 let expected = if let Some(expected) = &self.expected {
454 Some(expected.iter_log(epoch_range, key_range, options).await?)
455 } else {
456 None
457 };
458
459 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
460 }
461 }
462
463 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
464 for VerifyStateStore<A, E, T>
465 where
466 for<'a> T::ItemRef<'a>: PartialEq + Debug,
467 {
468 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
469 let actual = self.actual.try_next().await?;
470 if let Some(expected) = self.expected.as_mut() {
471 let expected = expected.try_next().await?;
472 assert_eq!(actual, expected);
473 }
474 Ok(actual)
475 }
476 }
477
478 fn verify_iter<T: IterItem>(
479 actual: impl StateStoreIter<T>,
480 expected: Option<impl StateStoreIter<T>>,
481 ) -> impl StateStoreIter<T>
482 where
483 for<'a> T::ItemRef<'a>: PartialEq + Debug,
484 {
485 VerifyStateStore {
486 actual,
487 expected,
488 _phantom: PhantomData::<T>,
489 }
490 }
491
492 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
493 type FlushedSnapshotReader =
494 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
495
496 type Iter<'a> = impl StateStoreIter + 'a;
497 type RevIter<'a> = impl StateStoreIter + 'a;
498
499 #[expect(clippy::manual_async_fn)]
500 fn iter(
501 &self,
502 key_range: TableKeyRange,
503 read_options: ReadOptions,
504 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
505 async move {
506 let actual = self
507 .actual
508 .iter(key_range.clone(), read_options.clone())
509 .await?;
510 let expected = if let Some(expected) = &self.expected {
511 Some(expected.iter(key_range, read_options).await?)
512 } else {
513 None
514 };
515
516 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
517 }
518 }
519
520 #[expect(clippy::manual_async_fn)]
521 fn rev_iter(
522 &self,
523 key_range: TableKeyRange,
524 read_options: ReadOptions,
525 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
526 async move {
527 let actual = self
528 .actual
529 .rev_iter(key_range.clone(), read_options.clone())
530 .await?;
531 let expected = if let Some(expected) = &self.expected {
532 Some(expected.rev_iter(key_range, read_options).await?)
533 } else {
534 None
535 };
536
537 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
538 }
539 }
540
541 fn insert(
542 &mut self,
543 key: TableKey<Bytes>,
544 new_val: Bytes,
545 old_val: Option<Bytes>,
546 ) -> StorageResult<()> {
547 if let Some(expected) = &mut self.expected {
548 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
549 }
550 self.actual.insert(key, new_val, old_val)?;
551
552 Ok(())
553 }
554
555 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
556 if let Some(expected) = &mut self.expected {
557 expected.delete(key.clone(), old_val.clone())?;
558 }
559 self.actual.delete(key, old_val)?;
560 Ok(())
561 }
562
563 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
564 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
565 if let Some(expected) = &mut self.expected {
566 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
567 }
568 Ok(ret)
569 }
570
571 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
572 let ret = self.actual.get_table_watermark(vnode);
573 if let Some(expected) = &self.expected {
574 assert_eq!(ret, expected.get_table_watermark(vnode));
575 }
576 ret
577 }
578
579 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
580 VerifyStateStore {
581 actual: self.actual.new_flushed_snapshot_reader(),
582 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
583 _phantom: Default::default(),
584 }
585 }
586 }
587
588 impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
589 for VerifyStateStore<A, E>
590 {
591 async fn flush(&mut self) -> StorageResult<usize> {
592 if let Some(expected) = &mut self.expected {
593 expected.flush().await?;
594 }
595 self.actual.flush().await
596 }
597
598 async fn try_flush(&mut self) -> StorageResult<()> {
599 if let Some(expected) = &mut self.expected {
600 expected.try_flush().await?;
601 }
602 self.actual.try_flush().await
603 }
604
605 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
606 self.actual.init(options.clone()).await?;
607 if let Some(expected) = &mut self.expected {
608 expected.init(options).await?;
609 }
610 Ok(())
611 }
612
613 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
614 if let Some(expected) = &mut self.expected {
615 expected.seal_current_epoch(next_epoch, opts.clone());
616 }
617 self.actual.seal_current_epoch(next_epoch, opts);
618 }
619 }
620
621 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
622 type Local = VerifyStateStore<A::Local, E::Local>;
623 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
624 type VectorWriter = A::VectorWriter;
625
626 fn try_wait_epoch(
627 &self,
628 epoch: HummockReadEpoch,
629 options: TryWaitEpochOptions,
630 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
631 self.actual.try_wait_epoch(epoch, options)
632 }
633
634 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
635 let expected = if let Some(expected) = &self.expected {
636 Some(expected.new_local(option.clone()).await)
637 } else {
638 None
639 };
640 VerifyStateStore {
641 actual: self.actual.new_local(option).await,
642 expected,
643 _phantom: PhantomData::<()>,
644 }
645 }
646
647 async fn new_read_snapshot(
648 &self,
649 epoch: HummockReadEpoch,
650 options: NewReadSnapshotOptions,
651 ) -> StorageResult<Self::ReadSnapshot> {
652 let expected = if let Some(expected) = &self.expected {
653 Some(expected.new_read_snapshot(epoch, options).await?)
654 } else {
655 None
656 };
657 Ok(VerifyStateStore {
658 actual: self.actual.new_read_snapshot(epoch, options).await?,
659 expected,
660 _phantom: PhantomData::<()>,
661 })
662 }
663
664 fn new_vector_writer(
665 &self,
666 options: NewVectorWriterOptions,
667 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
668 self.actual.new_vector_writer(options)
669 }
670 }
671
672 impl<A, E> Deref for VerifyStateStore<A, E> {
673 type Target = A;
674
675 fn deref(&self) -> &Self::Target {
676 &self.actual
677 }
678 }
679}
680
681impl StateStoreImpl {
682 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
683 #[allow(clippy::too_many_arguments)]
684 #[expect(clippy::borrowed_box)]
685 pub async fn new(
686 s: &str,
687 opts: Arc<StorageOpts>,
688 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
689 state_store_metrics: Arc<HummockStateStoreMetrics>,
690 object_store_metrics: Arc<ObjectStoreMetrics>,
691 storage_metrics: Arc<MonitoredStorageMetrics>,
692 compactor_metrics: Arc<CompactorMetrics>,
693 await_tree_config: Option<await_tree::Config>,
694 use_new_object_prefix_strategy: bool,
695 ) -> StorageResult<Self> {
696 const KB: usize = 1 << 10;
697 const MB: usize = 1 << 20;
698
699 let meta_cache = {
700 let mut builder = HybridCacheBuilder::new()
701 .with_name("foyer.meta")
702 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
703 .memory(opts.meta_cache_capacity_mb * MB)
704 .with_shards(opts.meta_cache_shard_num)
705 .with_eviction_config(opts.meta_cache_eviction_config.clone())
706 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
707 u64::BITS as usize / 8 + value.estimate_size()
708 })
709 .storage();
710
711 if !opts.meta_file_cache_dir.is_empty() {
712 if let Err(e) = Feature::ElasticDiskCache.check_available() {
713 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
714 } else {
715 let device = FsDeviceBuilder::new(&opts.meta_file_cache_dir)
716 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
717 .with_throttle(opts.meta_file_cache_throttle.clone())
718 .build()
719 .map_err(HummockError::foyer_io_error)?;
720 let engine_builder = BlockEngineBuilder::new(device)
721 .with_block_size(opts.meta_file_cache_file_capacity_mb * MB)
722 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
723 .with_flushers(opts.meta_file_cache_flushers)
724 .with_reclaimers(opts.meta_file_cache_reclaimers)
725 .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
727 opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
728 )
729 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
730 .with_blob_index_size(opts.meta_file_cache_blob_index_size_kb * KB)
731 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
732 opts.meta_file_cache_fifo_probation_ratio,
733 ))]);
734 builder = builder.with_engine_config(engine_builder);
735 }
736 }
737
738 builder.build().await.map_err(HummockError::foyer_error)?
739 };
740
741 let block_cache = {
742 let mut builder = HybridCacheBuilder::new()
743 .with_name("foyer.data")
744 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
745 .with_event_listener(Arc::new(BlockCacheEventListener::new(
746 state_store_metrics.clone(),
747 )))
748 .memory(opts.block_cache_capacity_mb * MB)
749 .with_shards(opts.block_cache_shard_num)
750 .with_eviction_config(opts.block_cache_eviction_config.clone())
751 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
752 u64::BITS as usize * 2 / 8 + value.raw().len()
754 })
755 .storage();
756
757 if !opts.data_file_cache_dir.is_empty() {
758 if let Err(e) = Feature::ElasticDiskCache.check_available() {
759 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
760 } else {
761 let device = FsDeviceBuilder::new(&opts.data_file_cache_dir)
762 .with_capacity(opts.data_file_cache_capacity_mb * MB)
763 .with_throttle(opts.data_file_cache_throttle.clone())
764 .build()
765 .map_err(HummockError::foyer_io_error)?;
766 let engine_builder = BlockEngineBuilder::new(device)
767 .with_block_size(opts.data_file_cache_file_capacity_mb * MB)
768 .with_indexer_shards(opts.data_file_cache_indexer_shards)
769 .with_flushers(opts.data_file_cache_flushers)
770 .with_reclaimers(opts.data_file_cache_reclaimers)
771 .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) .with_clean_block_threshold(
773 opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
774 )
775 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
776 .with_blob_index_size(opts.data_file_cache_blob_index_size_kb * KB)
777 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
778 opts.data_file_cache_fifo_probation_ratio,
779 ))]);
780 builder = builder.with_engine_config(engine_builder);
781 }
782 }
783
784 builder.build().await.map_err(HummockError::foyer_error)?
785 };
786
787 let vector_meta_cache = CacheBuilder::new(opts.vector_meta_cache_capacity_mb * MB)
788 .with_shards(opts.vector_meta_cache_shard_num)
789 .with_eviction_config(opts.vector_meta_cache_eviction_config.clone())
790 .build();
791
792 let vector_block_cache = CacheBuilder::new(opts.vector_block_cache_capacity_mb * MB)
793 .with_shards(opts.vector_block_cache_shard_num)
794 .with_eviction_config(opts.vector_block_cache_eviction_config.clone())
795 .build();
796
797 let recent_filter = if opts.data_file_cache_dir.is_empty() {
798 Arc::new(NoneRecentFilter::default().into())
799 } else if opts.cache_refill_recent_filter_shards == 1 {
800 Arc::new(
801 SimpleRecentFilter::new(
802 opts.cache_refill_recent_filter_layers,
803 Duration::from_millis(
804 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
805 ),
806 )
807 .into(),
808 )
809 } else if opts.cache_refill_skip_recent_filter {
810 Arc::new(AllRecentFilter::default().into())
811 } else {
812 Arc::new(
813 ShardedRecentFilter::new(
814 opts.cache_refill_recent_filter_layers,
815 Duration::from_millis(
816 opts.cache_refill_recent_filter_rotate_interval_ms as u64,
817 ),
818 opts.cache_refill_recent_filter_shards,
819 )
820 .into(),
821 )
822 };
823
824 let store = match s {
825 hummock if hummock.starts_with("hummock+") => {
826 let object_store = build_remote_object_store(
827 hummock.strip_prefix("hummock+").unwrap(),
828 object_store_metrics.clone(),
829 "Hummock",
830 Arc::new(opts.object_store_config.clone()),
831 )
832 .await;
833
834 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
835 store: Arc::new(object_store),
836 path: opts.data_directory.clone(),
837 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
838 max_prefetch_block_number: opts.max_prefetch_block_number,
839 recent_filter,
840 state_store_metrics: state_store_metrics.clone(),
841 use_new_object_prefix_strategy,
842
843 meta_cache,
844 block_cache,
845 vector_meta_cache,
846 vector_block_cache,
847 }));
848 let notification_client =
849 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
850 let compaction_catalog_manager_ref =
851 Arc::new(CompactionCatalogManager::new(Box::new(
852 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
853 )));
854
855 let inner = HummockStorage::new(
856 opts.clone(),
857 sstable_store,
858 hummock_meta_client.clone(),
859 notification_client,
860 compaction_catalog_manager_ref,
861 state_store_metrics.clone(),
862 compactor_metrics.clone(),
863 await_tree_config,
864 )
865 .await?;
866
867 StateStoreImpl::hummock(inner, storage_metrics)
868 }
869
870 "in_memory" | "in-memory" => {
871 tracing::warn!(
872 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
873 );
874 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
875 }
876
877 sled if sled.starts_with("sled://") => {
878 tracing::warn!(
879 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
880 );
881 let path = sled.strip_prefix("sled://").unwrap();
882 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
883 }
884
885 other => unimplemented!("{} state store is not supported", other),
886 };
887
888 Ok(store)
889 }
890}
891
892pub trait AsHummock: Send + Sync {
893 fn as_hummock(&self) -> Option<&HummockStorage>;
894
895 fn sync(
896 &self,
897 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
898 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
899 async move {
900 if let Some(hummock) = self.as_hummock() {
901 hummock.sync(sync_table_epochs).await
902 } else {
903 Ok(SyncResult::default())
904 }
905 }
906 .boxed()
907 }
908}
909
910impl AsHummock for HummockStorage {
911 fn as_hummock(&self) -> Option<&HummockStorage> {
912 Some(self)
913 }
914}
915
916impl AsHummock for MemoryStateStore {
917 fn as_hummock(&self) -> Option<&HummockStorage> {
918 None
919 }
920}
921
922impl AsHummock for SledStateStore {
923 fn as_hummock(&self) -> Option<&HummockStorage> {
924 None
925 }
926}
927
928#[cfg(debug_assertions)]
929mod dyn_state_store {
930 use std::future::Future;
931 use std::ops::DerefMut;
932 use std::sync::Arc;
933
934 use bytes::Bytes;
935 use risingwave_common::bitmap::Bitmap;
936 use risingwave_common::hash::VirtualNode;
937 use risingwave_hummock_sdk::HummockReadEpoch;
938 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
939
940 use crate::error::StorageResult;
941 use crate::hummock::HummockStorage;
942 use crate::store::*;
943 use crate::store_impl::AsHummock;
944 use crate::vector::VectorDistance;
945
946 #[async_trait::async_trait]
947 pub trait DynStateStoreIter<T: IterItem>: Send {
948 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
949 }
950
951 #[async_trait::async_trait]
952 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
953 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
954 self.try_next().await
955 }
956 }
957
958 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
959 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
960 fn try_next(
961 &mut self,
962 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
963 self.deref_mut().try_next()
964 }
965 }
966
967 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
970 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
971
972 #[async_trait::async_trait]
973 pub trait DynStateStoreGet: StaticSendSync {
974 async fn get_keyed_row(
975 &self,
976 key: TableKey<Bytes>,
977 read_options: ReadOptions,
978 ) -> StorageResult<Option<StateStoreKeyedRow>>;
979 }
980
981 #[async_trait::async_trait]
982 pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
983 async fn iter(
984 &self,
985 key_range: TableKeyRange,
986
987 read_options: ReadOptions,
988 ) -> StorageResult<BoxStateStoreReadIter>;
989
990 async fn rev_iter(
991 &self,
992 key_range: TableKeyRange,
993
994 read_options: ReadOptions,
995 ) -> StorageResult<BoxStateStoreReadIter>;
996 }
997
998 #[async_trait::async_trait]
999 pub trait DynStateStoreReadLog: StaticSendSync {
1000 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
1001 async fn iter_log(
1002 &self,
1003 epoch_range: (u64, u64),
1004 key_range: TableKeyRange,
1005 options: ReadLogOptions,
1006 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
1007 }
1008
1009 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
1010
1011 #[async_trait::async_trait]
1012 impl<S: StateStoreGet> DynStateStoreGet for S {
1013 async fn get_keyed_row(
1014 &self,
1015 key: TableKey<Bytes>,
1016 read_options: ReadOptions,
1017 ) -> StorageResult<Option<StateStoreKeyedRow>> {
1018 self.on_key_value(key, read_options, move |key, value| {
1019 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
1020 })
1021 .await
1022 }
1023 }
1024
1025 #[async_trait::async_trait]
1026 impl<S: StateStoreRead> DynStateStoreRead for S {
1027 async fn iter(
1028 &self,
1029 key_range: TableKeyRange,
1030
1031 read_options: ReadOptions,
1032 ) -> StorageResult<BoxStateStoreReadIter> {
1033 Ok(Box::new(self.iter(key_range, read_options).await?))
1034 }
1035
1036 async fn rev_iter(
1037 &self,
1038 key_range: TableKeyRange,
1039
1040 read_options: ReadOptions,
1041 ) -> StorageResult<BoxStateStoreReadIter> {
1042 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1043 }
1044 }
1045
1046 #[async_trait::async_trait]
1047 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1048 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1049 self.next_epoch(epoch, options).await
1050 }
1051
1052 async fn iter_log(
1053 &self,
1054 epoch_range: (u64, u64),
1055 key_range: TableKeyRange,
1056 options: ReadLogOptions,
1057 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1058 Ok(Box::new(
1059 self.iter_log(epoch_range, key_range, options).await?,
1060 ))
1061 }
1062 }
1063
1064 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1066 #[async_trait::async_trait]
1067 pub trait DynLocalStateStore:
1068 DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1069 {
1070 async fn iter(
1071 &self,
1072 key_range: TableKeyRange,
1073 read_options: ReadOptions,
1074 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1075
1076 async fn rev_iter(
1077 &self,
1078 key_range: TableKeyRange,
1079 read_options: ReadOptions,
1080 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1081
1082 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1083
1084 fn insert(
1085 &mut self,
1086 key: TableKey<Bytes>,
1087 new_val: Bytes,
1088 old_val: Option<Bytes>,
1089 ) -> StorageResult<()>;
1090
1091 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1092
1093 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1094
1095 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1096 }
1097
1098 #[async_trait::async_trait]
1099 pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1100 async fn flush(&mut self) -> StorageResult<usize>;
1101
1102 async fn try_flush(&mut self) -> StorageResult<()>;
1103
1104 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1105
1106 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1107 }
1108
1109 #[async_trait::async_trait]
1110 impl<S: LocalStateStore> DynLocalStateStore for S {
1111 async fn iter(
1112 &self,
1113 key_range: TableKeyRange,
1114 read_options: ReadOptions,
1115 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1116 Ok(Box::new(self.iter(key_range, read_options).await?))
1117 }
1118
1119 async fn rev_iter(
1120 &self,
1121 key_range: TableKeyRange,
1122 read_options: ReadOptions,
1123 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1124 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1125 }
1126
1127 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1128 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1129 }
1130
1131 fn insert(
1132 &mut self,
1133 key: TableKey<Bytes>,
1134 new_val: Bytes,
1135 old_val: Option<Bytes>,
1136 ) -> StorageResult<()> {
1137 self.insert(key, new_val, old_val)
1138 }
1139
1140 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1141 self.delete(key, old_val)
1142 }
1143
1144 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1145 self.update_vnode_bitmap(vnodes).await
1146 }
1147
1148 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1149 self.get_table_watermark(vnode)
1150 }
1151 }
1152
1153 #[async_trait::async_trait]
1154 impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1155 async fn flush(&mut self) -> StorageResult<usize> {
1156 self.flush().await
1157 }
1158
1159 async fn try_flush(&mut self) -> StorageResult<()> {
1160 self.try_flush().await
1161 }
1162
1163 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1164 self.init(options).await
1165 }
1166
1167 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1168 self.seal_current_epoch(next_epoch, opts)
1169 }
1170 }
1171
1172 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1173
1174 impl LocalStateStore for BoxDynLocalStateStore {
1175 type FlushedSnapshotReader = StateStoreReadDynRef;
1176 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1177 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1178
1179 fn iter(
1180 &self,
1181 key_range: TableKeyRange,
1182 read_options: ReadOptions,
1183 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1184 (*self.0).iter(key_range, read_options)
1185 }
1186
1187 fn rev_iter(
1188 &self,
1189 key_range: TableKeyRange,
1190 read_options: ReadOptions,
1191 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1192 (*self.0).rev_iter(key_range, read_options)
1193 }
1194
1195 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1196 (*self.0).new_flushed_snapshot_reader()
1197 }
1198
1199 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1200 (*self.0).get_table_watermark(vnode)
1201 }
1202
1203 fn insert(
1204 &mut self,
1205 key: TableKey<Bytes>,
1206 new_val: Bytes,
1207 old_val: Option<Bytes>,
1208 ) -> StorageResult<()> {
1209 (*self.0).insert(key, new_val, old_val)
1210 }
1211
1212 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1213 (*self.0).delete(key, old_val)
1214 }
1215
1216 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1217 (*self.0).update_vnode_bitmap(vnodes).await
1218 }
1219 }
1220
1221 impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1222 where
1223 StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1224 {
1225 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1226 self.as_mut().flush()
1227 }
1228
1229 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1230 self.as_mut().try_flush()
1231 }
1232
1233 fn init(
1234 &mut self,
1235 options: InitOptions,
1236 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1237 self.as_mut().init(options)
1238 }
1239
1240 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1241 self.as_mut().seal_current_epoch(next_epoch, opts)
1242 }
1243 }
1244
1245 #[async_trait::async_trait]
1246 pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1247 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1248 }
1249
1250 #[async_trait::async_trait]
1251 impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1252 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1253 self.insert(vec, info)
1254 }
1255 }
1256
1257 pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1258
1259 impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1260 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1261 self.0.insert(vec, info)
1262 }
1263 }
1264
1265 #[async_trait::async_trait]
1268 pub trait DynStateStoreReadVector: StaticSendSync {
1269 async fn nearest(
1270 &self,
1271 vec: Vector,
1272 options: VectorNearestOptions,
1273 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1274 }
1275
1276 #[async_trait::async_trait]
1277 impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1278 async fn nearest(
1279 &self,
1280 vec: Vector,
1281 options: VectorNearestOptions,
1282 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1283 use risingwave_common::types::ScalarRef;
1284 self.nearest(vec, options, |vec, distance, info| {
1285 (
1286 vec.to_owned_scalar(),
1287 distance,
1288 Bytes::copy_from_slice(info),
1289 )
1290 })
1291 .await
1292 }
1293 }
1294
1295 impl<P> StateStoreReadVector for StateStorePointer<P>
1296 where
1297 StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1298 {
1299 async fn nearest<O: Send + 'static>(
1300 &self,
1301 vec: Vector,
1302 options: VectorNearestOptions,
1303 on_nearest_item_fn: impl OnNearestItemFn<O>,
1304 ) -> StorageResult<Vec<O>> {
1305 let output = self.as_ref().nearest(vec, options).await?;
1306 Ok(output
1307 .into_iter()
1308 .map(|(vec, distance, info)| {
1309 on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1310 })
1311 .collect())
1312 }
1313 }
1314
1315 pub trait DynStateStoreReadSnapshot:
1316 DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1317 {
1318 }
1319
1320 impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1321 for S
1322 {
1323 }
1324
1325 pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1326 #[async_trait::async_trait]
1327 pub trait DynStateStoreExt: StaticSendSync {
1328 async fn try_wait_epoch(
1329 &self,
1330 epoch: HummockReadEpoch,
1331 options: TryWaitEpochOptions,
1332 ) -> StorageResult<()>;
1333
1334 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1335 async fn new_read_snapshot(
1336 &self,
1337 epoch: HummockReadEpoch,
1338 options: NewReadSnapshotOptions,
1339 ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1340 async fn new_vector_writer(
1341 &self,
1342 options: NewVectorWriterOptions,
1343 ) -> BoxDynStateStoreWriteVector;
1344 }
1345
1346 #[async_trait::async_trait]
1347 impl<S: StateStore> DynStateStoreExt for S {
1348 async fn try_wait_epoch(
1349 &self,
1350 epoch: HummockReadEpoch,
1351 options: TryWaitEpochOptions,
1352 ) -> StorageResult<()> {
1353 self.try_wait_epoch(epoch, options).await
1354 }
1355
1356 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1357 StateStorePointer(Box::new(self.new_local(option).await))
1358 }
1359
1360 async fn new_read_snapshot(
1361 &self,
1362 epoch: HummockReadEpoch,
1363 options: NewReadSnapshotOptions,
1364 ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1365 Ok(StateStorePointer(Arc::new(
1366 self.new_read_snapshot(epoch, options).await?,
1367 )))
1368 }
1369
1370 async fn new_vector_writer(
1371 &self,
1372 options: NewVectorWriterOptions,
1373 ) -> BoxDynStateStoreWriteVector {
1374 StateStorePointer(Box::new(self.new_vector_writer(options).await))
1375 }
1376 }
1377
1378 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1379
1380 macro_rules! state_store_pointer_dyn_as_ref {
1381 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1382 impl AsRef<dyn $target_dyn_trait>
1383 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1384 {
1385 fn as_ref(&self) -> &dyn $target_dyn_trait {
1386 (&*self.0) as _
1387 }
1388 }
1389 };
1390 }
1391
1392 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1393 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1394 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1395 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1396 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1397 state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1398
1399 macro_rules! state_store_pointer_dyn_as_mut {
1400 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1401 impl AsMut<dyn $target_dyn_trait>
1402 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1403 {
1404 fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1405 (&mut *self.0) as _
1406 }
1407 }
1408 };
1409 }
1410
1411 state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1412 state_store_pointer_dyn_as_mut!(
1413 Box<dyn DynStateStoreWriteVector>,
1414 DynStateStoreWriteEpochControl
1415 );
1416
1417 #[derive(Clone)]
1418 pub struct StateStorePointer<P>(pub(crate) P);
1419
1420 impl<P> StateStoreGet for StateStorePointer<P>
1421 where
1422 StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1423 {
1424 async fn on_key_value<O: Send + 'static>(
1425 &self,
1426 key: TableKey<Bytes>,
1427 read_options: ReadOptions,
1428 on_key_value_fn: impl KeyValueFn<O>,
1429 ) -> StorageResult<Option<O>> {
1430 let option = self.as_ref().get_keyed_row(key, read_options).await?;
1431 option
1432 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1433 .transpose()
1434 }
1435 }
1436
1437 impl<P> StateStoreRead for StateStorePointer<P>
1438 where
1439 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1440 {
1441 type Iter = BoxStateStoreReadIter;
1442 type RevIter = BoxStateStoreReadIter;
1443
1444 fn iter(
1445 &self,
1446 key_range: TableKeyRange,
1447
1448 read_options: ReadOptions,
1449 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1450 self.as_ref().iter(key_range, read_options)
1451 }
1452
1453 fn rev_iter(
1454 &self,
1455 key_range: TableKeyRange,
1456
1457 read_options: ReadOptions,
1458 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1459 self.as_ref().rev_iter(key_range, read_options)
1460 }
1461 }
1462
1463 impl StateStoreReadLog for StateStoreDynRef {
1464 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1465
1466 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1467 (*self.0).next_epoch(epoch, options).await
1468 }
1469
1470 fn iter_log(
1471 &self,
1472 epoch_range: (u64, u64),
1473 key_range: TableKeyRange,
1474 options: ReadLogOptions,
1475 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1476 (*self.0).iter_log(epoch_range, key_range, options)
1477 }
1478 }
1479
1480 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1481
1482 impl AsHummock for StateStoreDynRef {
1483 fn as_hummock(&self) -> Option<&HummockStorage> {
1484 (*self.0).as_hummock()
1485 }
1486 }
1487
1488 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1489
1490 impl StateStore for StateStoreDynRef {
1491 type Local = BoxDynLocalStateStore;
1492 type ReadSnapshot = StateStoreReadSnapshotDynRef;
1493 type VectorWriter = BoxDynStateStoreWriteVector;
1494
1495 fn try_wait_epoch(
1496 &self,
1497 epoch: HummockReadEpoch,
1498 options: TryWaitEpochOptions,
1499 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1500 (*self.0).try_wait_epoch(epoch, options)
1501 }
1502
1503 fn new_local(
1504 &self,
1505 option: NewLocalOptions,
1506 ) -> impl Future<Output = Self::Local> + Send + '_ {
1507 (*self.0).new_local(option)
1508 }
1509
1510 async fn new_read_snapshot(
1511 &self,
1512 epoch: HummockReadEpoch,
1513 options: NewReadSnapshotOptions,
1514 ) -> StorageResult<Self::ReadSnapshot> {
1515 (*self.0).new_read_snapshot(epoch, options).await
1516 }
1517
1518 fn new_vector_writer(
1519 &self,
1520 options: NewVectorWriterOptions,
1521 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1522 (*self.0).new_vector_writer(options)
1523 }
1524 }
1525}