1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{DirectFsDeviceOptions, Engine, FifoPicker, HybridCacheBuilder, LargeEngineOptions};
22use futures::FutureExt;
23use futures::future::BoxFuture;
24use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
25use risingwave_common::catalog::TableId;
26use risingwave_common::license::Feature;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use risingwave_common_service::RpcNotificationClient;
29use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
30use risingwave_object_store::object::build_remote_object_store;
31use thiserror_ext::AsReport;
32
33use crate::StateStore;
34use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
35use crate::error::StorageResult;
36use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
37use crate::hummock::{
38 Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
39 SstableBlockIndex, SstableStore, SstableStoreConfig,
40};
41use crate::memory::MemoryStateStore;
42use crate::memory::sled::SledStateStore;
43use crate::monitor::{
44 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
45 ObjectStoreMetrics,
46};
47use crate::opts::StorageOpts;
48
49static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
50 Box::new(PrometheusMetricsRegistry::new(
51 GLOBAL_METRICS_REGISTRY.clone(),
52 ))
53});
54
55mod opaque_type {
56 use super::*;
57
58 pub type HummockStorageType = impl StateStore + AsHummock;
59 pub type MemoryStateStoreType = impl StateStore + AsHummock;
60 pub type SledStateStoreType = impl StateStore + AsHummock;
61
62 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
63 may_dynamic_dispatch(state_store)
64 }
65
66 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
67 may_dynamic_dispatch(may_verify(state_store))
68 }
69
70 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
71 may_dynamic_dispatch(state_store)
72 }
73}
74pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
75use opaque_type::{hummock, in_memory, sled};
76
77#[cfg(feature = "hm-trace")]
78type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
79
80#[cfg(not(feature = "hm-trace"))]
81type Monitored<S> = MonitoredStateStore<S>;
82
83fn monitored<S: StateStore>(
84 state_store: S,
85 storage_metrics: Arc<MonitoredStorageMetrics>,
86) -> Monitored<S> {
87 let inner = {
88 #[cfg(feature = "hm-trace")]
89 {
90 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
91 }
92 #[cfg(not(feature = "hm-trace"))]
93 {
94 state_store
95 }
96 };
97 inner.monitored(storage_metrics)
98}
99
100fn inner<S>(state_store: &Monitored<S>) -> &S {
101 let inner = state_store.inner();
102 {
103 #[cfg(feature = "hm-trace")]
104 {
105 inner.inner()
106 }
107 #[cfg(not(feature = "hm-trace"))]
108 {
109 inner
110 }
111 }
112}
113
114#[derive(Clone, EnumAsInner)]
116#[allow(clippy::enum_variant_names)]
117pub enum StateStoreImpl {
118 HummockStateStore(Monitored<HummockStorageType>),
127 MemoryStateStore(Monitored<MemoryStateStoreType>),
132 SledStateStore(Monitored<SledStateStoreType>),
133}
134
135fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
136 #[cfg(not(debug_assertions))]
137 {
138 state_store
139 }
140 #[cfg(debug_assertions)]
141 {
142 use crate::store_impl::dyn_state_store::StateStorePointer;
143 StateStorePointer(Arc::new(state_store) as _)
144 }
145}
146
147fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
148 #[cfg(not(debug_assertions))]
149 {
150 state_store
151 }
152 #[cfg(debug_assertions)]
153 {
154 use std::marker::PhantomData;
155
156 use risingwave_common::util::env_var::env_var_is_true;
157 use tracing::info;
158
159 use crate::store_impl::verify::VerifyStateStore;
160
161 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
162 info!("enable verify state store");
163 Some(SledStateStore::new_temp())
164 } else {
165 info!("verify state store is not enabled");
166 None
167 };
168 VerifyStateStore {
169 actual: state_store,
170 expected,
171 _phantom: PhantomData::<()>,
172 }
173 }
174}
175
176impl StateStoreImpl {
177 fn in_memory(
178 state_store: MemoryStateStore,
179 storage_metrics: Arc<MonitoredStorageMetrics>,
180 ) -> Self {
181 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
183 }
184
185 pub fn hummock(
186 state_store: HummockStorage,
187 storage_metrics: Arc<MonitoredStorageMetrics>,
188 ) -> Self {
189 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
191 }
192
193 pub fn sled(
194 state_store: SledStateStore,
195 storage_metrics: Arc<MonitoredStorageMetrics>,
196 ) -> Self {
197 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
198 }
199
200 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
201 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
202 }
203
204 pub fn for_test() -> Self {
205 Self::in_memory(
206 MemoryStateStore::new(),
207 Arc::new(MonitoredStorageMetrics::unused()),
208 )
209 }
210
211 pub fn as_hummock(&self) -> Option<&HummockStorage> {
212 match self {
213 StateStoreImpl::HummockStateStore(hummock) => {
214 Some(inner(hummock).as_hummock().expect("should be hummock"))
215 }
216 _ => None,
217 }
218 }
219}
220
221impl Debug for StateStoreImpl {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 match self {
224 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
225 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
226 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
227 }
228 }
229}
230
231#[macro_export]
232macro_rules! dispatch_state_store {
233 ($impl:expr, $store:ident, $body:tt) => {{
234 use $crate::store_impl::StateStoreImpl;
235
236 match $impl {
237 StateStoreImpl::MemoryStateStore($store) => {
238 #[cfg(debug_assertions)]
241 {
242 $body
243 }
244 #[cfg(not(debug_assertions))]
245 {
246 let _store = $store;
247 unimplemented!("memory state store should never be used in release mode");
248 }
249 }
250
251 StateStoreImpl::SledStateStore($store) => {
252 #[cfg(debug_assertions)]
255 {
256 $body
257 }
258 #[cfg(not(debug_assertions))]
259 {
260 let _store = $store;
261 unimplemented!("sled state store should never be used in release mode");
262 }
263 }
264
265 StateStoreImpl::HummockStateStore($store) => $body,
266 }
267 }};
268}
269
270#[cfg(any(debug_assertions, test, feature = "test"))]
271pub mod verify {
272 use std::fmt::Debug;
273 use std::future::Future;
274 use std::marker::PhantomData;
275 use std::ops::Deref;
276 use std::sync::Arc;
277
278 use bytes::Bytes;
279 use risingwave_common::bitmap::Bitmap;
280 use risingwave_common::hash::VirtualNode;
281 use risingwave_hummock_sdk::HummockReadEpoch;
282 use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
283 use tracing::log::warn;
284
285 use crate::error::StorageResult;
286 use crate::hummock::HummockStorage;
287 use crate::store::*;
288 use crate::store_impl::AsHummock;
289
290 #[expect(dead_code)]
291 fn assert_result_eq<Item: PartialEq + Debug, E>(
292 first: &std::result::Result<Item, E>,
293 second: &std::result::Result<Item, E>,
294 ) {
295 match (first, second) {
296 (Ok(first), Ok(second)) => {
297 if first != second {
298 warn!("result different: {:?} {:?}", first, second);
299 }
300 assert_eq!(first, second);
301 }
302 (Err(_), Err(_)) => {}
303 _ => {
304 warn!("one success and one failed");
305 panic!("result not equal");
306 }
307 }
308 }
309
310 #[derive(Clone)]
311 pub struct VerifyStateStore<A, E, T = ()> {
312 pub actual: A,
313 pub expected: Option<E>,
314 pub _phantom: PhantomData<T>,
315 }
316
317 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
318 fn as_hummock(&self) -> Option<&HummockStorage> {
319 self.actual.as_hummock()
320 }
321 }
322
323 impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
324 async fn on_key_value<O: Send + 'static>(
325 &self,
326 key: TableKey<Bytes>,
327 read_options: ReadOptions,
328 on_key_value_fn: impl KeyValueFn<O>,
329 ) -> StorageResult<Option<O>> {
330 let actual: Option<(FullKey<Bytes>, Bytes)> = self
331 .actual
332 .on_key_value(key.clone(), read_options.clone(), |key, value| {
333 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
334 })
335 .await?;
336 if let Some(expected) = &self.expected {
337 let expected: Option<(FullKey<Bytes>, Bytes)> = expected
338 .on_key_value(key, read_options, |key, value| {
339 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
340 })
341 .await?;
342 assert_eq!(
343 actual
344 .as_ref()
345 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
346 expected
347 .as_ref()
348 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
349 );
350 }
351
352 actual
353 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
354 .transpose()
355 }
356 }
357
358 impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
359 for VerifyStateStore<A, E>
360 {
361 fn nearest<O: Send + 'static>(
362 &self,
363 vec: Vector,
364 options: VectorNearestOptions,
365 on_nearest_item_fn: impl OnNearestItemFn<O>,
366 ) -> impl StorageFuture<'_, Vec<O>> {
367 self.actual.nearest(vec, options, on_nearest_item_fn)
368 }
369 }
370
371 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
372 type Iter = impl StateStoreReadIter;
373 type RevIter = impl StateStoreReadIter;
374
375 #[expect(clippy::manual_async_fn)]
378 fn iter(
379 &self,
380 key_range: TableKeyRange,
381
382 read_options: ReadOptions,
383 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
384 async move {
385 let actual = self
386 .actual
387 .iter(key_range.clone(), read_options.clone())
388 .await?;
389 let expected = if let Some(expected) = &self.expected {
390 Some(expected.iter(key_range, read_options).await?)
391 } else {
392 None
393 };
394
395 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
396 }
397 }
398
399 #[expect(clippy::manual_async_fn)]
400 fn rev_iter(
401 &self,
402 key_range: TableKeyRange,
403
404 read_options: ReadOptions,
405 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
406 async move {
407 let actual = self
408 .actual
409 .rev_iter(key_range.clone(), read_options.clone())
410 .await?;
411 let expected = if let Some(expected) = &self.expected {
412 Some(expected.rev_iter(key_range, read_options).await?)
413 } else {
414 None
415 };
416
417 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
418 }
419 }
420 }
421
422 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
423 type ChangeLogIter = impl StateStoreReadChangeLogIter;
424
425 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
426 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
427 if let Some(expected) = &self.expected {
428 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
429 }
430 Ok(actual)
431 }
432
433 async fn iter_log(
434 &self,
435 epoch_range: (u64, u64),
436 key_range: TableKeyRange,
437 options: ReadLogOptions,
438 ) -> StorageResult<Self::ChangeLogIter> {
439 let actual = self
440 .actual
441 .iter_log(epoch_range, key_range.clone(), options.clone())
442 .await?;
443 let expected = if let Some(expected) = &self.expected {
444 Some(expected.iter_log(epoch_range, key_range, options).await?)
445 } else {
446 None
447 };
448
449 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
450 }
451 }
452
453 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
454 for VerifyStateStore<A, E, T>
455 where
456 for<'a> T::ItemRef<'a>: PartialEq + Debug,
457 {
458 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
459 let actual = self.actual.try_next().await?;
460 if let Some(expected) = self.expected.as_mut() {
461 let expected = expected.try_next().await?;
462 assert_eq!(actual, expected);
463 }
464 Ok(actual)
465 }
466 }
467
468 fn verify_iter<T: IterItem>(
469 actual: impl StateStoreIter<T>,
470 expected: Option<impl StateStoreIter<T>>,
471 ) -> impl StateStoreIter<T>
472 where
473 for<'a> T::ItemRef<'a>: PartialEq + Debug,
474 {
475 VerifyStateStore {
476 actual,
477 expected,
478 _phantom: PhantomData::<T>,
479 }
480 }
481
482 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
483 type FlushedSnapshotReader =
484 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
485
486 type Iter<'a> = impl StateStoreIter + 'a;
487 type RevIter<'a> = impl StateStoreIter + 'a;
488
489 #[expect(clippy::manual_async_fn)]
490 fn iter(
491 &self,
492 key_range: TableKeyRange,
493 read_options: ReadOptions,
494 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
495 async move {
496 let actual = self
497 .actual
498 .iter(key_range.clone(), read_options.clone())
499 .await?;
500 let expected = if let Some(expected) = &self.expected {
501 Some(expected.iter(key_range, read_options).await?)
502 } else {
503 None
504 };
505
506 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
507 }
508 }
509
510 #[expect(clippy::manual_async_fn)]
511 fn rev_iter(
512 &self,
513 key_range: TableKeyRange,
514 read_options: ReadOptions,
515 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
516 async move {
517 let actual = self
518 .actual
519 .rev_iter(key_range.clone(), read_options.clone())
520 .await?;
521 let expected = if let Some(expected) = &self.expected {
522 Some(expected.rev_iter(key_range, read_options).await?)
523 } else {
524 None
525 };
526
527 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
528 }
529 }
530
531 fn insert(
532 &mut self,
533 key: TableKey<Bytes>,
534 new_val: Bytes,
535 old_val: Option<Bytes>,
536 ) -> StorageResult<()> {
537 if let Some(expected) = &mut self.expected {
538 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
539 }
540 self.actual.insert(key, new_val, old_val)?;
541
542 Ok(())
543 }
544
545 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
546 if let Some(expected) = &mut self.expected {
547 expected.delete(key.clone(), old_val.clone())?;
548 }
549 self.actual.delete(key, old_val)?;
550 Ok(())
551 }
552
553 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
554 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
555 if let Some(expected) = &mut self.expected {
556 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
557 }
558 Ok(ret)
559 }
560
561 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
562 let ret = self.actual.get_table_watermark(vnode);
563 if let Some(expected) = &self.expected {
564 assert_eq!(ret, expected.get_table_watermark(vnode));
565 }
566 ret
567 }
568
569 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
570 VerifyStateStore {
571 actual: self.actual.new_flushed_snapshot_reader(),
572 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
573 _phantom: Default::default(),
574 }
575 }
576 }
577
578 impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
579 for VerifyStateStore<A, E>
580 {
581 async fn flush(&mut self) -> StorageResult<usize> {
582 if let Some(expected) = &mut self.expected {
583 expected.flush().await?;
584 }
585 self.actual.flush().await
586 }
587
588 async fn try_flush(&mut self) -> StorageResult<()> {
589 if let Some(expected) = &mut self.expected {
590 expected.try_flush().await?;
591 }
592 self.actual.try_flush().await
593 }
594
595 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
596 self.actual.init(options.clone()).await?;
597 if let Some(expected) = &mut self.expected {
598 expected.init(options).await?;
599 }
600 Ok(())
601 }
602
603 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
604 if let Some(expected) = &mut self.expected {
605 expected.seal_current_epoch(next_epoch, opts.clone());
606 }
607 self.actual.seal_current_epoch(next_epoch, opts);
608 }
609 }
610
611 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
612 type Local = VerifyStateStore<A::Local, E::Local>;
613 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
614 type VectorWriter = A::VectorWriter;
615
616 fn try_wait_epoch(
617 &self,
618 epoch: HummockReadEpoch,
619 options: TryWaitEpochOptions,
620 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
621 self.actual.try_wait_epoch(epoch, options)
622 }
623
624 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
625 let expected = if let Some(expected) = &self.expected {
626 Some(expected.new_local(option.clone()).await)
627 } else {
628 None
629 };
630 VerifyStateStore {
631 actual: self.actual.new_local(option).await,
632 expected,
633 _phantom: PhantomData::<()>,
634 }
635 }
636
637 async fn new_read_snapshot(
638 &self,
639 epoch: HummockReadEpoch,
640 options: NewReadSnapshotOptions,
641 ) -> StorageResult<Self::ReadSnapshot> {
642 let expected = if let Some(expected) = &self.expected {
643 Some(expected.new_read_snapshot(epoch, options).await?)
644 } else {
645 None
646 };
647 Ok(VerifyStateStore {
648 actual: self.actual.new_read_snapshot(epoch, options).await?,
649 expected,
650 _phantom: PhantomData::<()>,
651 })
652 }
653
654 fn new_vector_writer(
655 &self,
656 options: NewVectorWriterOptions,
657 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
658 self.actual.new_vector_writer(options)
659 }
660 }
661
662 impl<A, E> Deref for VerifyStateStore<A, E> {
663 type Target = A;
664
665 fn deref(&self) -> &Self::Target {
666 &self.actual
667 }
668 }
669}
670
671impl StateStoreImpl {
672 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
673 #[allow(clippy::too_many_arguments)]
674 #[expect(clippy::borrowed_box)]
675 pub async fn new(
676 s: &str,
677 opts: Arc<StorageOpts>,
678 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
679 state_store_metrics: Arc<HummockStateStoreMetrics>,
680 object_store_metrics: Arc<ObjectStoreMetrics>,
681 storage_metrics: Arc<MonitoredStorageMetrics>,
682 compactor_metrics: Arc<CompactorMetrics>,
683 await_tree_config: Option<await_tree::Config>,
684 use_new_object_prefix_strategy: bool,
685 ) -> StorageResult<Self> {
686 const KB: usize = 1 << 10;
687 const MB: usize = 1 << 20;
688
689 let meta_cache = {
690 let mut builder = HybridCacheBuilder::new()
691 .with_name("foyer.meta")
692 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
693 .memory(opts.meta_cache_capacity_mb * MB)
694 .with_shards(opts.meta_cache_shard_num)
695 .with_eviction_config(opts.meta_cache_eviction_config.clone())
696 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
697 u64::BITS as usize / 8 + value.estimate_size()
698 })
699 .storage(Engine::Large);
700
701 if !opts.meta_file_cache_dir.is_empty() {
702 if let Err(e) = Feature::ElasticDiskCache.check_available() {
703 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
704 } else {
705 builder = builder
706 .with_device_options(
707 DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
708 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
709 .with_file_size(opts.meta_file_cache_file_capacity_mb * MB)
710 .with_throttle(opts.meta_file_cache_throttle.clone()),
711 )
712 .with_recover_mode(opts.meta_file_cache_recover_mode)
713 .with_compression(opts.meta_file_cache_compression)
714 .with_runtime_options(opts.meta_file_cache_runtime_config.clone())
715 .with_large_object_disk_cache_options(
716 LargeEngineOptions::new()
717 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
718 .with_flushers(opts.meta_file_cache_flushers)
719 .with_reclaimers(opts.meta_file_cache_reclaimers)
720 .with_buffer_pool_size(
721 opts.meta_file_cache_flush_buffer_threshold_mb * MB,
722 ) .with_clean_region_threshold(
724 opts.meta_file_cache_reclaimers
725 + opts.meta_file_cache_reclaimers / 2,
726 )
727 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
728 .with_blob_index_size(16 * KB)
729 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
730 opts.meta_file_cache_fifo_probation_ratio,
731 ))]),
732 );
733 }
734 }
735
736 builder.build().await.map_err(HummockError::foyer_error)?
737 };
738
739 let block_cache = {
740 let mut builder = HybridCacheBuilder::new()
741 .with_name("foyer.data")
742 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
743 .with_event_listener(Arc::new(BlockCacheEventListener::new(
744 state_store_metrics.clone(),
745 )))
746 .memory(opts.block_cache_capacity_mb * MB)
747 .with_shards(opts.block_cache_shard_num)
748 .with_eviction_config(opts.block_cache_eviction_config.clone())
749 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
750 u64::BITS as usize * 2 / 8 + value.raw().len()
752 })
753 .storage(Engine::Large);
754
755 if !opts.data_file_cache_dir.is_empty() {
756 if let Err(e) = Feature::ElasticDiskCache.check_available() {
757 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
758 } else {
759 builder = builder
760 .with_device_options(
761 DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
762 .with_capacity(opts.data_file_cache_capacity_mb * MB)
763 .with_file_size(opts.data_file_cache_file_capacity_mb * MB)
764 .with_throttle(opts.data_file_cache_throttle.clone()),
765 )
766 .with_recover_mode(opts.data_file_cache_recover_mode)
767 .with_compression(opts.data_file_cache_compression)
768 .with_runtime_options(opts.data_file_cache_runtime_config.clone())
769 .with_large_object_disk_cache_options(
770 LargeEngineOptions::new()
771 .with_indexer_shards(opts.data_file_cache_indexer_shards)
772 .with_flushers(opts.data_file_cache_flushers)
773 .with_reclaimers(opts.data_file_cache_reclaimers)
774 .with_buffer_pool_size(
775 opts.data_file_cache_flush_buffer_threshold_mb * MB,
776 ) .with_clean_region_threshold(
778 opts.data_file_cache_reclaimers
779 + opts.data_file_cache_reclaimers / 2,
780 )
781 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
782 .with_blob_index_size(16 * KB)
783 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
784 opts.data_file_cache_fifo_probation_ratio,
785 ))]),
786 );
787 }
788 }
789
790 builder.build().await.map_err(HummockError::foyer_error)?
791 };
792
793 let recent_filter = if opts.data_file_cache_dir.is_empty() {
794 None
795 } else {
796 Some(Arc::new(RecentFilter::new(
797 opts.cache_refill_recent_filter_layers,
798 Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
799 )))
800 };
801
802 let store = match s {
803 hummock if hummock.starts_with("hummock+") => {
804 let object_store = build_remote_object_store(
805 hummock.strip_prefix("hummock+").unwrap(),
806 object_store_metrics.clone(),
807 "Hummock",
808 Arc::new(opts.object_store_config.clone()),
809 )
810 .await;
811
812 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
813 store: Arc::new(object_store),
814 path: opts.data_directory.clone(),
815 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
816 max_prefetch_block_number: opts.max_prefetch_block_number,
817 recent_filter,
818 state_store_metrics: state_store_metrics.clone(),
819 use_new_object_prefix_strategy,
820
821 meta_cache,
822 block_cache,
823 }));
824 let notification_client =
825 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
826 let compaction_catalog_manager_ref =
827 Arc::new(CompactionCatalogManager::new(Box::new(
828 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
829 )));
830
831 let inner = HummockStorage::new(
832 opts.clone(),
833 sstable_store,
834 hummock_meta_client.clone(),
835 notification_client,
836 compaction_catalog_manager_ref,
837 state_store_metrics.clone(),
838 compactor_metrics.clone(),
839 await_tree_config,
840 )
841 .await?;
842
843 StateStoreImpl::hummock(inner, storage_metrics)
844 }
845
846 "in_memory" | "in-memory" => {
847 tracing::warn!(
848 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
849 );
850 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
851 }
852
853 sled if sled.starts_with("sled://") => {
854 tracing::warn!(
855 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
856 );
857 let path = sled.strip_prefix("sled://").unwrap();
858 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
859 }
860
861 other => unimplemented!("{} state store is not supported", other),
862 };
863
864 Ok(store)
865 }
866}
867
868pub trait AsHummock: Send + Sync {
869 fn as_hummock(&self) -> Option<&HummockStorage>;
870
871 fn sync(
872 &self,
873 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
874 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
875 async move {
876 if let Some(hummock) = self.as_hummock() {
877 hummock.sync(sync_table_epochs).await
878 } else {
879 Ok(SyncResult::default())
880 }
881 }
882 .boxed()
883 }
884}
885
886impl AsHummock for HummockStorage {
887 fn as_hummock(&self) -> Option<&HummockStorage> {
888 Some(self)
889 }
890}
891
892impl AsHummock for MemoryStateStore {
893 fn as_hummock(&self) -> Option<&HummockStorage> {
894 None
895 }
896}
897
898impl AsHummock for SledStateStore {
899 fn as_hummock(&self) -> Option<&HummockStorage> {
900 None
901 }
902}
903
904#[cfg(debug_assertions)]
905mod dyn_state_store {
906 use std::future::Future;
907 use std::ops::DerefMut;
908 use std::sync::Arc;
909
910 use bytes::Bytes;
911 use risingwave_common::bitmap::Bitmap;
912 use risingwave_common::hash::VirtualNode;
913 use risingwave_hummock_sdk::HummockReadEpoch;
914 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
915
916 use crate::error::StorageResult;
917 use crate::hummock::HummockStorage;
918 use crate::store::*;
919 use crate::store_impl::AsHummock;
920 use crate::vector::VectorDistance;
921
922 #[async_trait::async_trait]
923 pub trait DynStateStoreIter<T: IterItem>: Send {
924 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
925 }
926
927 #[async_trait::async_trait]
928 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
929 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
930 self.try_next().await
931 }
932 }
933
934 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
935 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
936 fn try_next(
937 &mut self,
938 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
939 self.deref_mut().try_next()
940 }
941 }
942
943 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
946 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
947
948 #[async_trait::async_trait]
949 pub trait DynStateStoreGet: StaticSendSync {
950 async fn get_keyed_row(
951 &self,
952 key: TableKey<Bytes>,
953 read_options: ReadOptions,
954 ) -> StorageResult<Option<StateStoreKeyedRow>>;
955 }
956
957 #[async_trait::async_trait]
958 pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
959 async fn iter(
960 &self,
961 key_range: TableKeyRange,
962
963 read_options: ReadOptions,
964 ) -> StorageResult<BoxStateStoreReadIter>;
965
966 async fn rev_iter(
967 &self,
968 key_range: TableKeyRange,
969
970 read_options: ReadOptions,
971 ) -> StorageResult<BoxStateStoreReadIter>;
972 }
973
974 #[async_trait::async_trait]
975 pub trait DynStateStoreReadLog: StaticSendSync {
976 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
977 async fn iter_log(
978 &self,
979 epoch_range: (u64, u64),
980 key_range: TableKeyRange,
981 options: ReadLogOptions,
982 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
983 }
984
985 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
986
987 #[async_trait::async_trait]
988 impl<S: StateStoreGet> DynStateStoreGet for S {
989 async fn get_keyed_row(
990 &self,
991 key: TableKey<Bytes>,
992 read_options: ReadOptions,
993 ) -> StorageResult<Option<StateStoreKeyedRow>> {
994 self.on_key_value(key, read_options, move |key, value| {
995 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
996 })
997 .await
998 }
999 }
1000
1001 #[async_trait::async_trait]
1002 impl<S: StateStoreRead> DynStateStoreRead for S {
1003 async fn iter(
1004 &self,
1005 key_range: TableKeyRange,
1006
1007 read_options: ReadOptions,
1008 ) -> StorageResult<BoxStateStoreReadIter> {
1009 Ok(Box::new(self.iter(key_range, read_options).await?))
1010 }
1011
1012 async fn rev_iter(
1013 &self,
1014 key_range: TableKeyRange,
1015
1016 read_options: ReadOptions,
1017 ) -> StorageResult<BoxStateStoreReadIter> {
1018 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1019 }
1020 }
1021
1022 #[async_trait::async_trait]
1023 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1024 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1025 self.next_epoch(epoch, options).await
1026 }
1027
1028 async fn iter_log(
1029 &self,
1030 epoch_range: (u64, u64),
1031 key_range: TableKeyRange,
1032 options: ReadLogOptions,
1033 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1034 Ok(Box::new(
1035 self.iter_log(epoch_range, key_range, options).await?,
1036 ))
1037 }
1038 }
1039
1040 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1042 #[async_trait::async_trait]
1043 pub trait DynLocalStateStore:
1044 DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1045 {
1046 async fn iter(
1047 &self,
1048 key_range: TableKeyRange,
1049 read_options: ReadOptions,
1050 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1051
1052 async fn rev_iter(
1053 &self,
1054 key_range: TableKeyRange,
1055 read_options: ReadOptions,
1056 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1057
1058 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1059
1060 fn insert(
1061 &mut self,
1062 key: TableKey<Bytes>,
1063 new_val: Bytes,
1064 old_val: Option<Bytes>,
1065 ) -> StorageResult<()>;
1066
1067 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1068
1069 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1070
1071 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1072 }
1073
1074 #[async_trait::async_trait]
1075 pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1076 async fn flush(&mut self) -> StorageResult<usize>;
1077
1078 async fn try_flush(&mut self) -> StorageResult<()>;
1079
1080 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1081
1082 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1083 }
1084
1085 #[async_trait::async_trait]
1086 impl<S: LocalStateStore> DynLocalStateStore for S {
1087 async fn iter(
1088 &self,
1089 key_range: TableKeyRange,
1090 read_options: ReadOptions,
1091 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1092 Ok(Box::new(self.iter(key_range, read_options).await?))
1093 }
1094
1095 async fn rev_iter(
1096 &self,
1097 key_range: TableKeyRange,
1098 read_options: ReadOptions,
1099 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1100 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1101 }
1102
1103 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1104 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1105 }
1106
1107 fn insert(
1108 &mut self,
1109 key: TableKey<Bytes>,
1110 new_val: Bytes,
1111 old_val: Option<Bytes>,
1112 ) -> StorageResult<()> {
1113 self.insert(key, new_val, old_val)
1114 }
1115
1116 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1117 self.delete(key, old_val)
1118 }
1119
1120 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1121 self.update_vnode_bitmap(vnodes).await
1122 }
1123
1124 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1125 self.get_table_watermark(vnode)
1126 }
1127 }
1128
1129 #[async_trait::async_trait]
1130 impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1131 async fn flush(&mut self) -> StorageResult<usize> {
1132 self.flush().await
1133 }
1134
1135 async fn try_flush(&mut self) -> StorageResult<()> {
1136 self.try_flush().await
1137 }
1138
1139 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1140 self.init(options).await
1141 }
1142
1143 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1144 self.seal_current_epoch(next_epoch, opts)
1145 }
1146 }
1147
1148 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1149
1150 impl LocalStateStore for BoxDynLocalStateStore {
1151 type FlushedSnapshotReader = StateStoreReadDynRef;
1152 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1153 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1154
1155 fn iter(
1156 &self,
1157 key_range: TableKeyRange,
1158 read_options: ReadOptions,
1159 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1160 (*self.0).iter(key_range, read_options)
1161 }
1162
1163 fn rev_iter(
1164 &self,
1165 key_range: TableKeyRange,
1166 read_options: ReadOptions,
1167 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1168 (*self.0).rev_iter(key_range, read_options)
1169 }
1170
1171 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1172 (*self.0).new_flushed_snapshot_reader()
1173 }
1174
1175 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1176 (*self.0).get_table_watermark(vnode)
1177 }
1178
1179 fn insert(
1180 &mut self,
1181 key: TableKey<Bytes>,
1182 new_val: Bytes,
1183 old_val: Option<Bytes>,
1184 ) -> StorageResult<()> {
1185 (*self.0).insert(key, new_val, old_val)
1186 }
1187
1188 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1189 (*self.0).delete(key, old_val)
1190 }
1191
1192 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1193 (*self.0).update_vnode_bitmap(vnodes).await
1194 }
1195 }
1196
1197 impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1198 where
1199 StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1200 {
1201 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1202 self.as_mut().flush()
1203 }
1204
1205 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1206 self.as_mut().try_flush()
1207 }
1208
1209 fn init(
1210 &mut self,
1211 options: InitOptions,
1212 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1213 self.as_mut().init(options)
1214 }
1215
1216 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1217 self.as_mut().seal_current_epoch(next_epoch, opts)
1218 }
1219 }
1220
1221 #[async_trait::async_trait]
1222 pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1223 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1224 }
1225
1226 #[async_trait::async_trait]
1227 impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1228 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1229 self.insert(vec, info)
1230 }
1231 }
1232
1233 pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1234
1235 impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1236 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1237 self.0.insert(vec, info)
1238 }
1239 }
1240
1241 #[async_trait::async_trait]
1244 pub trait DynStateStoreReadVector: StaticSendSync {
1245 async fn nearest(
1246 &self,
1247 vec: Vector,
1248 options: VectorNearestOptions,
1249 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1250 }
1251
1252 #[async_trait::async_trait]
1253 impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1254 async fn nearest(
1255 &self,
1256 vec: Vector,
1257 options: VectorNearestOptions,
1258 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1259 self.nearest(vec, options, |vec, distance, info| {
1260 (
1261 Vector::clone_from_ref(vec),
1262 distance,
1263 Bytes::copy_from_slice(info),
1264 )
1265 })
1266 .await
1267 }
1268 }
1269
1270 impl<P> StateStoreReadVector for StateStorePointer<P>
1271 where
1272 StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1273 {
1274 async fn nearest<O: Send + 'static>(
1275 &self,
1276 vec: Vector,
1277 options: VectorNearestOptions,
1278 on_nearest_item_fn: impl OnNearestItemFn<O>,
1279 ) -> StorageResult<Vec<O>> {
1280 let output = self.as_ref().nearest(vec, options).await?;
1281 Ok(output
1282 .into_iter()
1283 .map(|(vec, distance, info)| {
1284 on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1285 })
1286 .collect())
1287 }
1288 }
1289
1290 pub trait DynStateStoreReadSnapshot:
1291 DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1292 {
1293 }
1294
1295 impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1296 for S
1297 {
1298 }
1299
1300 pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1301 #[async_trait::async_trait]
1302 pub trait DynStateStoreExt: StaticSendSync {
1303 async fn try_wait_epoch(
1304 &self,
1305 epoch: HummockReadEpoch,
1306 options: TryWaitEpochOptions,
1307 ) -> StorageResult<()>;
1308
1309 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1310 async fn new_read_snapshot(
1311 &self,
1312 epoch: HummockReadEpoch,
1313 options: NewReadSnapshotOptions,
1314 ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1315 async fn new_vector_writer(
1316 &self,
1317 options: NewVectorWriterOptions,
1318 ) -> BoxDynStateStoreWriteVector;
1319 }
1320
1321 #[async_trait::async_trait]
1322 impl<S: StateStore> DynStateStoreExt for S {
1323 async fn try_wait_epoch(
1324 &self,
1325 epoch: HummockReadEpoch,
1326 options: TryWaitEpochOptions,
1327 ) -> StorageResult<()> {
1328 self.try_wait_epoch(epoch, options).await
1329 }
1330
1331 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1332 StateStorePointer(Box::new(self.new_local(option).await))
1333 }
1334
1335 async fn new_read_snapshot(
1336 &self,
1337 epoch: HummockReadEpoch,
1338 options: NewReadSnapshotOptions,
1339 ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1340 Ok(StateStorePointer(Arc::new(
1341 self.new_read_snapshot(epoch, options).await?,
1342 )))
1343 }
1344
1345 async fn new_vector_writer(
1346 &self,
1347 options: NewVectorWriterOptions,
1348 ) -> BoxDynStateStoreWriteVector {
1349 StateStorePointer(Box::new(self.new_vector_writer(options).await))
1350 }
1351 }
1352
1353 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1354
1355 macro_rules! state_store_pointer_dyn_as_ref {
1356 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1357 impl AsRef<dyn $target_dyn_trait>
1358 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1359 {
1360 fn as_ref(&self) -> &dyn $target_dyn_trait {
1361 (&*self.0) as _
1362 }
1363 }
1364 };
1365 }
1366
1367 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1368 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1369 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1370 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1371 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1372 state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1373
1374 macro_rules! state_store_pointer_dyn_as_mut {
1375 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1376 impl AsMut<dyn $target_dyn_trait>
1377 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1378 {
1379 fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1380 (&mut *self.0) as _
1381 }
1382 }
1383 };
1384 }
1385
1386 state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1387 state_store_pointer_dyn_as_mut!(
1388 Box<dyn DynStateStoreWriteVector>,
1389 DynStateStoreWriteEpochControl
1390 );
1391
1392 #[derive(Clone)]
1393 pub struct StateStorePointer<P>(pub(crate) P);
1394
1395 impl<P> StateStoreGet for StateStorePointer<P>
1396 where
1397 StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1398 {
1399 async fn on_key_value<O: Send + 'static>(
1400 &self,
1401 key: TableKey<Bytes>,
1402 read_options: ReadOptions,
1403 on_key_value_fn: impl KeyValueFn<O>,
1404 ) -> StorageResult<Option<O>> {
1405 let option = self.as_ref().get_keyed_row(key, read_options).await?;
1406 option
1407 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1408 .transpose()
1409 }
1410 }
1411
1412 impl<P> StateStoreRead for StateStorePointer<P>
1413 where
1414 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1415 {
1416 type Iter = BoxStateStoreReadIter;
1417 type RevIter = BoxStateStoreReadIter;
1418
1419 fn iter(
1420 &self,
1421 key_range: TableKeyRange,
1422
1423 read_options: ReadOptions,
1424 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1425 self.as_ref().iter(key_range, read_options)
1426 }
1427
1428 fn rev_iter(
1429 &self,
1430 key_range: TableKeyRange,
1431
1432 read_options: ReadOptions,
1433 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1434 self.as_ref().rev_iter(key_range, read_options)
1435 }
1436 }
1437
1438 impl StateStoreReadLog for StateStoreDynRef {
1439 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1440
1441 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1442 (*self.0).next_epoch(epoch, options).await
1443 }
1444
1445 fn iter_log(
1446 &self,
1447 epoch_range: (u64, u64),
1448 key_range: TableKeyRange,
1449 options: ReadLogOptions,
1450 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1451 (*self.0).iter_log(epoch_range, key_range, options)
1452 }
1453 }
1454
1455 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1456
1457 impl AsHummock for StateStoreDynRef {
1458 fn as_hummock(&self) -> Option<&HummockStorage> {
1459 (*self.0).as_hummock()
1460 }
1461 }
1462
1463 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1464
1465 impl StateStore for StateStoreDynRef {
1466 type Local = BoxDynLocalStateStore;
1467 type ReadSnapshot = StateStoreReadSnapshotDynRef;
1468 type VectorWriter = BoxDynStateStoreWriteVector;
1469
1470 fn try_wait_epoch(
1471 &self,
1472 epoch: HummockReadEpoch,
1473 options: TryWaitEpochOptions,
1474 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1475 (*self.0).try_wait_epoch(epoch, options)
1476 }
1477
1478 fn new_local(
1479 &self,
1480 option: NewLocalOptions,
1481 ) -> impl Future<Output = Self::Local> + Send + '_ {
1482 (*self.0).new_local(option)
1483 }
1484
1485 async fn new_read_snapshot(
1486 &self,
1487 epoch: HummockReadEpoch,
1488 options: NewReadSnapshotOptions,
1489 ) -> StorageResult<Self::ReadSnapshot> {
1490 (*self.0).new_read_snapshot(epoch, options).await
1491 }
1492
1493 fn new_vector_writer(
1494 &self,
1495 options: NewVectorWriterOptions,
1496 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1497 (*self.0).new_vector_writer(options)
1498 }
1499 }
1500}