1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{DirectFsDeviceOptions, Engine, FifoPicker, HybridCacheBuilder, LargeEngineOptions};
22use futures::FutureExt;
23use futures::future::BoxFuture;
24use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
25use risingwave_common::catalog::TableId;
26use risingwave_common::license::Feature;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use risingwave_common_service::RpcNotificationClient;
29use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
30use risingwave_object_store::object::build_remote_object_store;
31use thiserror_ext::AsReport;
32
33use crate::StateStore;
34use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
35use crate::error::StorageResult;
36use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
37use crate::hummock::{
38 Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
39 SstableBlockIndex, SstableStore, SstableStoreConfig,
40};
41use crate::memory::MemoryStateStore;
42use crate::memory::sled::SledStateStore;
43use crate::monitor::{
44 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
45 ObjectStoreMetrics,
46};
47use crate::opts::StorageOpts;
48
49static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
50 Box::new(PrometheusMetricsRegistry::new(
51 GLOBAL_METRICS_REGISTRY.clone(),
52 ))
53});
54
55mod opaque_type {
56 use super::*;
57
58 pub type HummockStorageType = impl StateStore + AsHummock;
59 pub type MemoryStateStoreType = impl StateStore + AsHummock;
60 pub type SledStateStoreType = impl StateStore + AsHummock;
61
62 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
63 may_dynamic_dispatch(state_store)
64 }
65
66 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
67 may_dynamic_dispatch(may_verify(state_store))
68 }
69
70 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
71 may_dynamic_dispatch(state_store)
72 }
73}
74pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
75use opaque_type::{hummock, in_memory, sled};
76
77#[cfg(feature = "hm-trace")]
78type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
79
80#[cfg(not(feature = "hm-trace"))]
81type Monitored<S> = MonitoredStateStore<S>;
82
83fn monitored<S: StateStore>(
84 state_store: S,
85 storage_metrics: Arc<MonitoredStorageMetrics>,
86) -> Monitored<S> {
87 let inner = {
88 #[cfg(feature = "hm-trace")]
89 {
90 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
91 }
92 #[cfg(not(feature = "hm-trace"))]
93 {
94 state_store
95 }
96 };
97 inner.monitored(storage_metrics)
98}
99
100fn inner<S>(state_store: &Monitored<S>) -> &S {
101 let inner = state_store.inner();
102 {
103 #[cfg(feature = "hm-trace")]
104 {
105 inner.inner()
106 }
107 #[cfg(not(feature = "hm-trace"))]
108 {
109 inner
110 }
111 }
112}
113
114#[derive(Clone, EnumAsInner)]
116#[allow(clippy::enum_variant_names)]
117pub enum StateStoreImpl {
118 HummockStateStore(Monitored<HummockStorageType>),
127 MemoryStateStore(Monitored<MemoryStateStoreType>),
132 SledStateStore(Monitored<SledStateStoreType>),
133}
134
135fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
136 #[cfg(not(debug_assertions))]
137 {
138 state_store
139 }
140 #[cfg(debug_assertions)]
141 {
142 use crate::store_impl::dyn_state_store::StateStorePointer;
143 StateStorePointer(Arc::new(state_store) as _)
144 }
145}
146
147fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
148 #[cfg(not(debug_assertions))]
149 {
150 state_store
151 }
152 #[cfg(debug_assertions)]
153 {
154 use std::marker::PhantomData;
155
156 use risingwave_common::util::env_var::env_var_is_true;
157 use tracing::info;
158
159 use crate::store_impl::verify::VerifyStateStore;
160
161 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
162 info!("enable verify state store");
163 Some(SledStateStore::new_temp())
164 } else {
165 info!("verify state store is not enabled");
166 None
167 };
168 VerifyStateStore {
169 actual: state_store,
170 expected,
171 _phantom: PhantomData::<()>,
172 }
173 }
174}
175
176impl StateStoreImpl {
177 fn in_memory(
178 state_store: MemoryStateStore,
179 storage_metrics: Arc<MonitoredStorageMetrics>,
180 ) -> Self {
181 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
183 }
184
185 pub fn hummock(
186 state_store: HummockStorage,
187 storage_metrics: Arc<MonitoredStorageMetrics>,
188 ) -> Self {
189 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
191 }
192
193 pub fn sled(
194 state_store: SledStateStore,
195 storage_metrics: Arc<MonitoredStorageMetrics>,
196 ) -> Self {
197 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
198 }
199
200 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
201 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
202 }
203
204 pub fn for_test() -> Self {
205 Self::in_memory(
206 MemoryStateStore::new(),
207 Arc::new(MonitoredStorageMetrics::unused()),
208 )
209 }
210
211 pub fn as_hummock(&self) -> Option<&HummockStorage> {
212 match self {
213 StateStoreImpl::HummockStateStore(hummock) => {
214 Some(inner(hummock).as_hummock().expect("should be hummock"))
215 }
216 _ => None,
217 }
218 }
219}
220
221impl Debug for StateStoreImpl {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 match self {
224 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
225 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
226 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
227 }
228 }
229}
230
231#[macro_export]
232macro_rules! dispatch_state_store {
233 ($impl:expr, $store:ident, $body:tt) => {{
234 use $crate::store_impl::StateStoreImpl;
235
236 match $impl {
237 StateStoreImpl::MemoryStateStore($store) => {
238 #[cfg(debug_assertions)]
241 {
242 $body
243 }
244 #[cfg(not(debug_assertions))]
245 {
246 let _store = $store;
247 unimplemented!("memory state store should never be used in release mode");
248 }
249 }
250
251 StateStoreImpl::SledStateStore($store) => {
252 #[cfg(debug_assertions)]
255 {
256 $body
257 }
258 #[cfg(not(debug_assertions))]
259 {
260 let _store = $store;
261 unimplemented!("sled state store should never be used in release mode");
262 }
263 }
264
265 StateStoreImpl::HummockStateStore($store) => $body,
266 }
267 }};
268}
269
270#[cfg(any(debug_assertions, test, feature = "test"))]
271pub mod verify {
272 use std::fmt::Debug;
273 use std::future::Future;
274 use std::marker::PhantomData;
275 use std::ops::Deref;
276 use std::sync::Arc;
277
278 use bytes::Bytes;
279 use risingwave_common::bitmap::Bitmap;
280 use risingwave_common::hash::VirtualNode;
281 use risingwave_hummock_sdk::HummockReadEpoch;
282 use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
283 use tracing::log::warn;
284
285 use crate::error::StorageResult;
286 use crate::hummock::HummockStorage;
287 use crate::store::*;
288 use crate::store_impl::AsHummock;
289
290 #[expect(dead_code)]
291 fn assert_result_eq<Item: PartialEq + Debug, E>(
292 first: &std::result::Result<Item, E>,
293 second: &std::result::Result<Item, E>,
294 ) {
295 match (first, second) {
296 (Ok(first), Ok(second)) => {
297 if first != second {
298 warn!("result different: {:?} {:?}", first, second);
299 }
300 assert_eq!(first, second);
301 }
302 (Err(_), Err(_)) => {}
303 _ => {
304 warn!("one success and one failed");
305 panic!("result not equal");
306 }
307 }
308 }
309
310 #[derive(Clone)]
311 pub struct VerifyStateStore<A, E, T = ()> {
312 pub actual: A,
313 pub expected: Option<E>,
314 pub _phantom: PhantomData<T>,
315 }
316
317 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
318 fn as_hummock(&self) -> Option<&HummockStorage> {
319 self.actual.as_hummock()
320 }
321 }
322
323 impl<A: StateStoreGet, E: StateStoreGet> StateStoreGet for VerifyStateStore<A, E> {
324 async fn on_key_value<O: Send + 'static>(
325 &self,
326 key: TableKey<Bytes>,
327 read_options: ReadOptions,
328 on_key_value_fn: impl KeyValueFn<O>,
329 ) -> StorageResult<Option<O>> {
330 let actual: Option<(FullKey<Bytes>, Bytes)> = self
331 .actual
332 .on_key_value(key.clone(), read_options.clone(), |key, value| {
333 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
334 })
335 .await?;
336 if let Some(expected) = &self.expected {
337 let expected: Option<(FullKey<Bytes>, Bytes)> = expected
338 .on_key_value(key, read_options, |key, value| {
339 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
340 })
341 .await?;
342 assert_eq!(
343 actual
344 .as_ref()
345 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item)),
346 expected
347 .as_ref()
348 .map(|item| (item.0.epoch_with_gap.pure_epoch(), item))
349 );
350 }
351
352 actual
353 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
354 .transpose()
355 }
356 }
357
358 impl<A: StateStoreReadVector, E: StateStoreReadVector> StateStoreReadVector
359 for VerifyStateStore<A, E>
360 {
361 fn nearest<O: Send + 'static>(
362 &self,
363 vec: Vector,
364 options: VectorNearestOptions,
365 on_nearest_item_fn: impl OnNearestItemFn<O>,
366 ) -> impl StorageFuture<'_, Vec<O>> {
367 self.actual.nearest(vec, options, on_nearest_item_fn)
368 }
369 }
370
371 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
372 type Iter = impl StateStoreReadIter;
373 type RevIter = impl StateStoreReadIter;
374
375 #[expect(clippy::manual_async_fn)]
378 fn iter(
379 &self,
380 key_range: TableKeyRange,
381
382 read_options: ReadOptions,
383 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
384 async move {
385 let actual = self
386 .actual
387 .iter(key_range.clone(), read_options.clone())
388 .await?;
389 let expected = if let Some(expected) = &self.expected {
390 Some(expected.iter(key_range, read_options).await?)
391 } else {
392 None
393 };
394
395 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
396 }
397 }
398
399 #[expect(clippy::manual_async_fn)]
400 fn rev_iter(
401 &self,
402 key_range: TableKeyRange,
403
404 read_options: ReadOptions,
405 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
406 async move {
407 let actual = self
408 .actual
409 .rev_iter(key_range.clone(), read_options.clone())
410 .await?;
411 let expected = if let Some(expected) = &self.expected {
412 Some(expected.rev_iter(key_range, read_options).await?)
413 } else {
414 None
415 };
416
417 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
418 }
419 }
420 }
421
422 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
423 type ChangeLogIter = impl StateStoreReadChangeLogIter;
424
425 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
426 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
427 if let Some(expected) = &self.expected {
428 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
429 }
430 Ok(actual)
431 }
432
433 async fn iter_log(
434 &self,
435 epoch_range: (u64, u64),
436 key_range: TableKeyRange,
437 options: ReadLogOptions,
438 ) -> StorageResult<Self::ChangeLogIter> {
439 let actual = self
440 .actual
441 .iter_log(epoch_range, key_range.clone(), options.clone())
442 .await?;
443 let expected = if let Some(expected) = &self.expected {
444 Some(expected.iter_log(epoch_range, key_range, options).await?)
445 } else {
446 None
447 };
448
449 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
450 }
451 }
452
453 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
454 for VerifyStateStore<A, E, T>
455 where
456 for<'a> T::ItemRef<'a>: PartialEq + Debug,
457 {
458 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
459 let actual = self.actual.try_next().await?;
460 if let Some(expected) = self.expected.as_mut() {
461 let expected = expected.try_next().await?;
462 assert_eq!(actual, expected);
463 }
464 Ok(actual)
465 }
466 }
467
468 fn verify_iter<T: IterItem>(
469 actual: impl StateStoreIter<T>,
470 expected: Option<impl StateStoreIter<T>>,
471 ) -> impl StateStoreIter<T>
472 where
473 for<'a> T::ItemRef<'a>: PartialEq + Debug,
474 {
475 VerifyStateStore {
476 actual,
477 expected,
478 _phantom: PhantomData::<T>,
479 }
480 }
481
482 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
483 type FlushedSnapshotReader =
484 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
485
486 type Iter<'a> = impl StateStoreIter + 'a;
487 type RevIter<'a> = impl StateStoreIter + 'a;
488
489 #[expect(clippy::manual_async_fn)]
490 fn iter(
491 &self,
492 key_range: TableKeyRange,
493 read_options: ReadOptions,
494 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
495 async move {
496 let actual = self
497 .actual
498 .iter(key_range.clone(), read_options.clone())
499 .await?;
500 let expected = if let Some(expected) = &self.expected {
501 Some(expected.iter(key_range, read_options).await?)
502 } else {
503 None
504 };
505
506 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
507 }
508 }
509
510 #[expect(clippy::manual_async_fn)]
511 fn rev_iter(
512 &self,
513 key_range: TableKeyRange,
514 read_options: ReadOptions,
515 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
516 async move {
517 let actual = self
518 .actual
519 .rev_iter(key_range.clone(), read_options.clone())
520 .await?;
521 let expected = if let Some(expected) = &self.expected {
522 Some(expected.rev_iter(key_range, read_options).await?)
523 } else {
524 None
525 };
526
527 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
528 }
529 }
530
531 fn insert(
532 &mut self,
533 key: TableKey<Bytes>,
534 new_val: Bytes,
535 old_val: Option<Bytes>,
536 ) -> StorageResult<()> {
537 if let Some(expected) = &mut self.expected {
538 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
539 }
540 self.actual.insert(key, new_val, old_val)?;
541
542 Ok(())
543 }
544
545 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
546 if let Some(expected) = &mut self.expected {
547 expected.delete(key.clone(), old_val.clone())?;
548 }
549 self.actual.delete(key, old_val)?;
550 Ok(())
551 }
552
553 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
554 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
555 if let Some(expected) = &mut self.expected {
556 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
557 }
558 Ok(ret)
559 }
560
561 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
562 let ret = self.actual.get_table_watermark(vnode);
563 if let Some(expected) = &self.expected {
564 assert_eq!(ret, expected.get_table_watermark(vnode));
565 }
566 ret
567 }
568
569 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
570 VerifyStateStore {
571 actual: self.actual.new_flushed_snapshot_reader(),
572 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
573 _phantom: Default::default(),
574 }
575 }
576 }
577
578 impl<A: StateStoreWriteEpochControl, E: StateStoreWriteEpochControl> StateStoreWriteEpochControl
579 for VerifyStateStore<A, E>
580 {
581 async fn flush(&mut self) -> StorageResult<usize> {
582 if let Some(expected) = &mut self.expected {
583 expected.flush().await?;
584 }
585 self.actual.flush().await
586 }
587
588 async fn try_flush(&mut self) -> StorageResult<()> {
589 if let Some(expected) = &mut self.expected {
590 expected.try_flush().await?;
591 }
592 self.actual.try_flush().await
593 }
594
595 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
596 self.actual.init(options.clone()).await?;
597 if let Some(expected) = &mut self.expected {
598 expected.init(options).await?;
599 }
600 Ok(())
601 }
602
603 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
604 if let Some(expected) = &mut self.expected {
605 expected.seal_current_epoch(next_epoch, opts.clone());
606 }
607 self.actual.seal_current_epoch(next_epoch, opts);
608 }
609 }
610
611 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
612 type Local = VerifyStateStore<A::Local, E::Local>;
613 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
614 type VectorWriter = A::VectorWriter;
615
616 fn try_wait_epoch(
617 &self,
618 epoch: HummockReadEpoch,
619 options: TryWaitEpochOptions,
620 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
621 self.actual.try_wait_epoch(epoch, options)
622 }
623
624 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
625 let expected = if let Some(expected) = &self.expected {
626 Some(expected.new_local(option.clone()).await)
627 } else {
628 None
629 };
630 VerifyStateStore {
631 actual: self.actual.new_local(option).await,
632 expected,
633 _phantom: PhantomData::<()>,
634 }
635 }
636
637 async fn new_read_snapshot(
638 &self,
639 epoch: HummockReadEpoch,
640 options: NewReadSnapshotOptions,
641 ) -> StorageResult<Self::ReadSnapshot> {
642 let expected = if let Some(expected) = &self.expected {
643 Some(expected.new_read_snapshot(epoch, options).await?)
644 } else {
645 None
646 };
647 Ok(VerifyStateStore {
648 actual: self.actual.new_read_snapshot(epoch, options).await?,
649 expected,
650 _phantom: PhantomData::<()>,
651 })
652 }
653
654 fn new_vector_writer(
655 &self,
656 options: NewVectorWriterOptions,
657 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
658 self.actual.new_vector_writer(options)
659 }
660 }
661
662 impl<A, E> Deref for VerifyStateStore<A, E> {
663 type Target = A;
664
665 fn deref(&self) -> &Self::Target {
666 &self.actual
667 }
668 }
669}
670
671impl StateStoreImpl {
672 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
673 #[allow(clippy::too_many_arguments)]
674 #[expect(clippy::borrowed_box)]
675 pub async fn new(
676 s: &str,
677 opts: Arc<StorageOpts>,
678 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
679 state_store_metrics: Arc<HummockStateStoreMetrics>,
680 object_store_metrics: Arc<ObjectStoreMetrics>,
681 storage_metrics: Arc<MonitoredStorageMetrics>,
682 compactor_metrics: Arc<CompactorMetrics>,
683 await_tree_config: Option<await_tree::Config>,
684 use_new_object_prefix_strategy: bool,
685 ) -> StorageResult<Self> {
686 const KB: usize = 1 << 10;
687 const MB: usize = 1 << 20;
688
689 let meta_cache = {
690 let mut builder = HybridCacheBuilder::new()
691 .with_name("foyer.meta")
692 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
693 .memory(opts.meta_cache_capacity_mb * MB)
694 .with_shards(opts.meta_cache_shard_num)
695 .with_eviction_config(opts.meta_cache_eviction_config.clone())
696 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
697 u64::BITS as usize / 8 + value.estimate_size()
698 })
699 .storage(Engine::Large(
700 LargeEngineOptions::new()
701 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
702 .with_flushers(opts.meta_file_cache_flushers)
703 .with_reclaimers(opts.meta_file_cache_reclaimers)
704 .with_buffer_pool_size(opts.meta_file_cache_flush_buffer_threshold_mb * MB) .with_clean_region_threshold(
706 opts.meta_file_cache_reclaimers + opts.meta_file_cache_reclaimers / 2,
707 )
708 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
709 .with_blob_index_size(16 * KB)
710 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
711 opts.meta_file_cache_fifo_probation_ratio,
712 ))]),
713 ));
714
715 if !opts.meta_file_cache_dir.is_empty() {
716 if let Err(e) = Feature::ElasticDiskCache.check_available() {
717 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
718 } else {
719 builder = builder
720 .with_device_options(
721 DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
722 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
723 .with_file_size(opts.meta_file_cache_file_capacity_mb * MB)
724 .with_throttle(opts.meta_file_cache_throttle.clone()),
725 )
726 .with_recover_mode(opts.meta_file_cache_recover_mode)
727 .with_compression(opts.meta_file_cache_compression)
728 .with_runtime_options(opts.meta_file_cache_runtime_config.clone());
729 }
730 }
731
732 builder.build().await.map_err(HummockError::foyer_error)?
733 };
734
735 let block_cache = {
736 let mut builder = HybridCacheBuilder::new()
737 .with_name("foyer.data")
738 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
739 .with_event_listener(Arc::new(BlockCacheEventListener::new(
740 state_store_metrics.clone(),
741 )))
742 .memory(opts.block_cache_capacity_mb * MB)
743 .with_shards(opts.block_cache_shard_num)
744 .with_eviction_config(opts.block_cache_eviction_config.clone())
745 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
746 u64::BITS as usize * 2 / 8 + value.raw().len()
748 })
749 .storage(Engine::Large(
750 LargeEngineOptions::new()
751 .with_indexer_shards(opts.data_file_cache_indexer_shards)
752 .with_flushers(opts.data_file_cache_flushers)
753 .with_reclaimers(opts.data_file_cache_reclaimers)
754 .with_buffer_pool_size(opts.data_file_cache_flush_buffer_threshold_mb * MB) .with_clean_region_threshold(
756 opts.data_file_cache_reclaimers + opts.data_file_cache_reclaimers / 2,
757 )
758 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
759 .with_blob_index_size(16 * KB)
760 .with_eviction_pickers(vec![Box::new(FifoPicker::new(
761 opts.data_file_cache_fifo_probation_ratio,
762 ))]),
763 ));
764
765 if !opts.data_file_cache_dir.is_empty() {
766 if let Err(e) = Feature::ElasticDiskCache.check_available() {
767 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
768 } else {
769 builder = builder
770 .with_device_options(
771 DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
772 .with_capacity(opts.data_file_cache_capacity_mb * MB)
773 .with_file_size(opts.data_file_cache_file_capacity_mb * MB)
774 .with_throttle(opts.data_file_cache_throttle.clone()),
775 )
776 .with_recover_mode(opts.data_file_cache_recover_mode)
777 .with_compression(opts.data_file_cache_compression)
778 .with_runtime_options(opts.data_file_cache_runtime_config.clone());
779 }
780 }
781
782 builder.build().await.map_err(HummockError::foyer_error)?
783 };
784
785 let recent_filter = if opts.data_file_cache_dir.is_empty() {
786 None
787 } else {
788 Some(Arc::new(RecentFilter::new(
789 opts.cache_refill_recent_filter_layers,
790 Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
791 )))
792 };
793
794 let store = match s {
795 hummock if hummock.starts_with("hummock+") => {
796 let object_store = build_remote_object_store(
797 hummock.strip_prefix("hummock+").unwrap(),
798 object_store_metrics.clone(),
799 "Hummock",
800 Arc::new(opts.object_store_config.clone()),
801 )
802 .await;
803
804 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
805 store: Arc::new(object_store),
806 path: opts.data_directory.clone(),
807 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
808 max_prefetch_block_number: opts.max_prefetch_block_number,
809 recent_filter,
810 state_store_metrics: state_store_metrics.clone(),
811 use_new_object_prefix_strategy,
812
813 meta_cache,
814 block_cache,
815 }));
816 let notification_client =
817 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
818 let compaction_catalog_manager_ref =
819 Arc::new(CompactionCatalogManager::new(Box::new(
820 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
821 )));
822
823 let inner = HummockStorage::new(
824 opts.clone(),
825 sstable_store,
826 hummock_meta_client.clone(),
827 notification_client,
828 compaction_catalog_manager_ref,
829 state_store_metrics.clone(),
830 compactor_metrics.clone(),
831 await_tree_config,
832 )
833 .await?;
834
835 StateStoreImpl::hummock(inner, storage_metrics)
836 }
837
838 "in_memory" | "in-memory" => {
839 tracing::warn!(
840 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
841 );
842 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
843 }
844
845 sled if sled.starts_with("sled://") => {
846 tracing::warn!(
847 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
848 );
849 let path = sled.strip_prefix("sled://").unwrap();
850 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
851 }
852
853 other => unimplemented!("{} state store is not supported", other),
854 };
855
856 Ok(store)
857 }
858}
859
860pub trait AsHummock: Send + Sync {
861 fn as_hummock(&self) -> Option<&HummockStorage>;
862
863 fn sync(
864 &self,
865 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
866 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
867 async move {
868 if let Some(hummock) = self.as_hummock() {
869 hummock.sync(sync_table_epochs).await
870 } else {
871 Ok(SyncResult::default())
872 }
873 }
874 .boxed()
875 }
876}
877
878impl AsHummock for HummockStorage {
879 fn as_hummock(&self) -> Option<&HummockStorage> {
880 Some(self)
881 }
882}
883
884impl AsHummock for MemoryStateStore {
885 fn as_hummock(&self) -> Option<&HummockStorage> {
886 None
887 }
888}
889
890impl AsHummock for SledStateStore {
891 fn as_hummock(&self) -> Option<&HummockStorage> {
892 None
893 }
894}
895
896#[cfg(debug_assertions)]
897mod dyn_state_store {
898 use std::future::Future;
899 use std::ops::DerefMut;
900 use std::sync::Arc;
901
902 use bytes::Bytes;
903 use risingwave_common::bitmap::Bitmap;
904 use risingwave_common::hash::VirtualNode;
905 use risingwave_hummock_sdk::HummockReadEpoch;
906 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
907
908 use crate::error::StorageResult;
909 use crate::hummock::HummockStorage;
910 use crate::store::*;
911 use crate::store_impl::AsHummock;
912 use crate::vector::VectorDistance;
913
914 #[async_trait::async_trait]
915 pub trait DynStateStoreIter<T: IterItem>: Send {
916 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
917 }
918
919 #[async_trait::async_trait]
920 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
921 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
922 self.try_next().await
923 }
924 }
925
926 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
927 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
928 fn try_next(
929 &mut self,
930 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
931 self.deref_mut().try_next()
932 }
933 }
934
935 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
938 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
939
940 #[async_trait::async_trait]
941 pub trait DynStateStoreGet: StaticSendSync {
942 async fn get_keyed_row(
943 &self,
944 key: TableKey<Bytes>,
945 read_options: ReadOptions,
946 ) -> StorageResult<Option<StateStoreKeyedRow>>;
947 }
948
949 #[async_trait::async_trait]
950 pub trait DynStateStoreRead: DynStateStoreGet + StaticSendSync {
951 async fn iter(
952 &self,
953 key_range: TableKeyRange,
954
955 read_options: ReadOptions,
956 ) -> StorageResult<BoxStateStoreReadIter>;
957
958 async fn rev_iter(
959 &self,
960 key_range: TableKeyRange,
961
962 read_options: ReadOptions,
963 ) -> StorageResult<BoxStateStoreReadIter>;
964 }
965
966 #[async_trait::async_trait]
967 pub trait DynStateStoreReadLog: StaticSendSync {
968 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
969 async fn iter_log(
970 &self,
971 epoch_range: (u64, u64),
972 key_range: TableKeyRange,
973 options: ReadLogOptions,
974 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
975 }
976
977 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
978
979 #[async_trait::async_trait]
980 impl<S: StateStoreGet> DynStateStoreGet for S {
981 async fn get_keyed_row(
982 &self,
983 key: TableKey<Bytes>,
984 read_options: ReadOptions,
985 ) -> StorageResult<Option<StateStoreKeyedRow>> {
986 self.on_key_value(key, read_options, move |key, value| {
987 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
988 })
989 .await
990 }
991 }
992
993 #[async_trait::async_trait]
994 impl<S: StateStoreRead> DynStateStoreRead for S {
995 async fn iter(
996 &self,
997 key_range: TableKeyRange,
998
999 read_options: ReadOptions,
1000 ) -> StorageResult<BoxStateStoreReadIter> {
1001 Ok(Box::new(self.iter(key_range, read_options).await?))
1002 }
1003
1004 async fn rev_iter(
1005 &self,
1006 key_range: TableKeyRange,
1007
1008 read_options: ReadOptions,
1009 ) -> StorageResult<BoxStateStoreReadIter> {
1010 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1011 }
1012 }
1013
1014 #[async_trait::async_trait]
1015 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1016 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1017 self.next_epoch(epoch, options).await
1018 }
1019
1020 async fn iter_log(
1021 &self,
1022 epoch_range: (u64, u64),
1023 key_range: TableKeyRange,
1024 options: ReadLogOptions,
1025 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1026 Ok(Box::new(
1027 self.iter_log(epoch_range, key_range, options).await?,
1028 ))
1029 }
1030 }
1031
1032 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1034 #[async_trait::async_trait]
1035 pub trait DynLocalStateStore:
1036 DynStateStoreGet + DynStateStoreWriteEpochControl + StaticSendSync
1037 {
1038 async fn iter(
1039 &self,
1040 key_range: TableKeyRange,
1041 read_options: ReadOptions,
1042 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1043
1044 async fn rev_iter(
1045 &self,
1046 key_range: TableKeyRange,
1047 read_options: ReadOptions,
1048 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1049
1050 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1051
1052 fn insert(
1053 &mut self,
1054 key: TableKey<Bytes>,
1055 new_val: Bytes,
1056 old_val: Option<Bytes>,
1057 ) -> StorageResult<()>;
1058
1059 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1060
1061 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1062
1063 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1064 }
1065
1066 #[async_trait::async_trait]
1067 pub trait DynStateStoreWriteEpochControl: StaticSendSync {
1068 async fn flush(&mut self) -> StorageResult<usize>;
1069
1070 async fn try_flush(&mut self) -> StorageResult<()>;
1071
1072 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1073
1074 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1075 }
1076
1077 #[async_trait::async_trait]
1078 impl<S: LocalStateStore> DynLocalStateStore for S {
1079 async fn iter(
1080 &self,
1081 key_range: TableKeyRange,
1082 read_options: ReadOptions,
1083 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1084 Ok(Box::new(self.iter(key_range, read_options).await?))
1085 }
1086
1087 async fn rev_iter(
1088 &self,
1089 key_range: TableKeyRange,
1090 read_options: ReadOptions,
1091 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1092 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1093 }
1094
1095 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1096 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1097 }
1098
1099 fn insert(
1100 &mut self,
1101 key: TableKey<Bytes>,
1102 new_val: Bytes,
1103 old_val: Option<Bytes>,
1104 ) -> StorageResult<()> {
1105 self.insert(key, new_val, old_val)
1106 }
1107
1108 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1109 self.delete(key, old_val)
1110 }
1111
1112 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1113 self.update_vnode_bitmap(vnodes).await
1114 }
1115
1116 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1117 self.get_table_watermark(vnode)
1118 }
1119 }
1120
1121 #[async_trait::async_trait]
1122 impl<S: StateStoreWriteEpochControl> DynStateStoreWriteEpochControl for S {
1123 async fn flush(&mut self) -> StorageResult<usize> {
1124 self.flush().await
1125 }
1126
1127 async fn try_flush(&mut self) -> StorageResult<()> {
1128 self.try_flush().await
1129 }
1130
1131 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1132 self.init(options).await
1133 }
1134
1135 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1136 self.seal_current_epoch(next_epoch, opts)
1137 }
1138 }
1139
1140 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1141
1142 impl LocalStateStore for BoxDynLocalStateStore {
1143 type FlushedSnapshotReader = StateStoreReadDynRef;
1144 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1145 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1146
1147 fn iter(
1148 &self,
1149 key_range: TableKeyRange,
1150 read_options: ReadOptions,
1151 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1152 (*self.0).iter(key_range, read_options)
1153 }
1154
1155 fn rev_iter(
1156 &self,
1157 key_range: TableKeyRange,
1158 read_options: ReadOptions,
1159 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1160 (*self.0).rev_iter(key_range, read_options)
1161 }
1162
1163 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1164 (*self.0).new_flushed_snapshot_reader()
1165 }
1166
1167 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1168 (*self.0).get_table_watermark(vnode)
1169 }
1170
1171 fn insert(
1172 &mut self,
1173 key: TableKey<Bytes>,
1174 new_val: Bytes,
1175 old_val: Option<Bytes>,
1176 ) -> StorageResult<()> {
1177 (*self.0).insert(key, new_val, old_val)
1178 }
1179
1180 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1181 (*self.0).delete(key, old_val)
1182 }
1183
1184 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1185 (*self.0).update_vnode_bitmap(vnodes).await
1186 }
1187 }
1188
1189 impl<P> StateStoreWriteEpochControl for StateStorePointer<P>
1190 where
1191 StateStorePointer<P>: AsMut<dyn DynStateStoreWriteEpochControl> + StaticSendSync,
1192 {
1193 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1194 self.as_mut().flush()
1195 }
1196
1197 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1198 self.as_mut().try_flush()
1199 }
1200
1201 fn init(
1202 &mut self,
1203 options: InitOptions,
1204 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1205 self.as_mut().init(options)
1206 }
1207
1208 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1209 self.as_mut().seal_current_epoch(next_epoch, opts)
1210 }
1211 }
1212
1213 #[async_trait::async_trait]
1214 pub trait DynStateStoreWriteVector: DynStateStoreWriteEpochControl + StaticSendSync {
1215 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()>;
1216 }
1217
1218 #[async_trait::async_trait]
1219 impl<S: StateStoreWriteVector> DynStateStoreWriteVector for S {
1220 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1221 self.insert(vec, info)
1222 }
1223 }
1224
1225 pub type BoxDynStateStoreWriteVector = StateStorePointer<Box<dyn DynStateStoreWriteVector>>;
1226
1227 impl StateStoreWriteVector for BoxDynStateStoreWriteVector {
1228 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1229 self.0.insert(vec, info)
1230 }
1231 }
1232
1233 #[async_trait::async_trait]
1236 pub trait DynStateStoreReadVector: StaticSendSync {
1237 async fn nearest(
1238 &self,
1239 vec: Vector,
1240 options: VectorNearestOptions,
1241 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>>;
1242 }
1243
1244 #[async_trait::async_trait]
1245 impl<S: StateStoreReadVector> DynStateStoreReadVector for S {
1246 async fn nearest(
1247 &self,
1248 vec: Vector,
1249 options: VectorNearestOptions,
1250 ) -> StorageResult<Vec<(Vector, VectorDistance, Bytes)>> {
1251 self.nearest(vec, options, |vec, distance, info| {
1252 (
1253 Vector::clone_from_ref(vec),
1254 distance,
1255 Bytes::copy_from_slice(info),
1256 )
1257 })
1258 .await
1259 }
1260 }
1261
1262 impl<P> StateStoreReadVector for StateStorePointer<P>
1263 where
1264 StateStorePointer<P>: AsRef<dyn DynStateStoreReadVector> + StaticSendSync,
1265 {
1266 async fn nearest<O: Send + 'static>(
1267 &self,
1268 vec: Vector,
1269 options: VectorNearestOptions,
1270 on_nearest_item_fn: impl OnNearestItemFn<O>,
1271 ) -> StorageResult<Vec<O>> {
1272 let output = self.as_ref().nearest(vec, options).await?;
1273 Ok(output
1274 .into_iter()
1275 .map(|(vec, distance, info)| {
1276 on_nearest_item_fn(vec.to_ref(), distance, info.as_ref())
1277 })
1278 .collect())
1279 }
1280 }
1281
1282 pub trait DynStateStoreReadSnapshot:
1283 DynStateStoreRead + DynStateStoreReadVector + StaticSendSync
1284 {
1285 }
1286
1287 impl<S: DynStateStoreRead + DynStateStoreReadVector + StaticSendSync> DynStateStoreReadSnapshot
1288 for S
1289 {
1290 }
1291
1292 pub type StateStoreReadSnapshotDynRef = StateStorePointer<Arc<dyn DynStateStoreReadSnapshot>>;
1293 #[async_trait::async_trait]
1294 pub trait DynStateStoreExt: StaticSendSync {
1295 async fn try_wait_epoch(
1296 &self,
1297 epoch: HummockReadEpoch,
1298 options: TryWaitEpochOptions,
1299 ) -> StorageResult<()>;
1300
1301 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1302 async fn new_read_snapshot(
1303 &self,
1304 epoch: HummockReadEpoch,
1305 options: NewReadSnapshotOptions,
1306 ) -> StorageResult<StateStoreReadSnapshotDynRef>;
1307 async fn new_vector_writer(
1308 &self,
1309 options: NewVectorWriterOptions,
1310 ) -> BoxDynStateStoreWriteVector;
1311 }
1312
1313 #[async_trait::async_trait]
1314 impl<S: StateStore> DynStateStoreExt for S {
1315 async fn try_wait_epoch(
1316 &self,
1317 epoch: HummockReadEpoch,
1318 options: TryWaitEpochOptions,
1319 ) -> StorageResult<()> {
1320 self.try_wait_epoch(epoch, options).await
1321 }
1322
1323 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1324 StateStorePointer(Box::new(self.new_local(option).await))
1325 }
1326
1327 async fn new_read_snapshot(
1328 &self,
1329 epoch: HummockReadEpoch,
1330 options: NewReadSnapshotOptions,
1331 ) -> StorageResult<StateStoreReadSnapshotDynRef> {
1332 Ok(StateStorePointer(Arc::new(
1333 self.new_read_snapshot(epoch, options).await?,
1334 )))
1335 }
1336
1337 async fn new_vector_writer(
1338 &self,
1339 options: NewVectorWriterOptions,
1340 ) -> BoxDynStateStoreWriteVector {
1341 StateStorePointer(Box::new(self.new_vector_writer(options).await))
1342 }
1343 }
1344
1345 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1346
1347 macro_rules! state_store_pointer_dyn_as_ref {
1348 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1349 impl AsRef<dyn $target_dyn_trait>
1350 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1351 {
1352 fn as_ref(&self) -> &dyn $target_dyn_trait {
1353 (&*self.0) as _
1354 }
1355 }
1356 };
1357 }
1358
1359 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreRead);
1360 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreGet);
1361 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreReadSnapshot>, DynStateStoreReadVector);
1362 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1363 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreGet);
1364 state_store_pointer_dyn_as_ref!(Box<dyn DynLocalStateStore>, DynStateStoreGet);
1365
1366 macro_rules! state_store_pointer_dyn_as_mut {
1367 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1368 impl AsMut<dyn $target_dyn_trait>
1369 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1370 {
1371 fn as_mut(&mut self) -> &mut dyn $target_dyn_trait {
1372 (&mut *self.0) as _
1373 }
1374 }
1375 };
1376 }
1377
1378 state_store_pointer_dyn_as_mut!(Box<dyn DynLocalStateStore>, DynStateStoreWriteEpochControl);
1379 state_store_pointer_dyn_as_mut!(
1380 Box<dyn DynStateStoreWriteVector>,
1381 DynStateStoreWriteEpochControl
1382 );
1383
1384 #[derive(Clone)]
1385 pub struct StateStorePointer<P>(pub(crate) P);
1386
1387 impl<P> StateStoreGet for StateStorePointer<P>
1388 where
1389 StateStorePointer<P>: AsRef<dyn DynStateStoreGet> + StaticSendSync,
1390 {
1391 async fn on_key_value<O: Send + 'static>(
1392 &self,
1393 key: TableKey<Bytes>,
1394 read_options: ReadOptions,
1395 on_key_value_fn: impl KeyValueFn<O>,
1396 ) -> StorageResult<Option<O>> {
1397 let option = self.as_ref().get_keyed_row(key, read_options).await?;
1398 option
1399 .map(|(key, value)| on_key_value_fn(key.to_ref(), value.as_ref()))
1400 .transpose()
1401 }
1402 }
1403
1404 impl<P> StateStoreRead for StateStorePointer<P>
1405 where
1406 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StateStoreGet + StaticSendSync,
1407 {
1408 type Iter = BoxStateStoreReadIter;
1409 type RevIter = BoxStateStoreReadIter;
1410
1411 fn iter(
1412 &self,
1413 key_range: TableKeyRange,
1414
1415 read_options: ReadOptions,
1416 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1417 self.as_ref().iter(key_range, read_options)
1418 }
1419
1420 fn rev_iter(
1421 &self,
1422 key_range: TableKeyRange,
1423
1424 read_options: ReadOptions,
1425 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1426 self.as_ref().rev_iter(key_range, read_options)
1427 }
1428 }
1429
1430 impl StateStoreReadLog for StateStoreDynRef {
1431 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1432
1433 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1434 (*self.0).next_epoch(epoch, options).await
1435 }
1436
1437 fn iter_log(
1438 &self,
1439 epoch_range: (u64, u64),
1440 key_range: TableKeyRange,
1441 options: ReadLogOptions,
1442 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1443 (*self.0).iter_log(epoch_range, key_range, options)
1444 }
1445 }
1446
1447 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1448
1449 impl AsHummock for StateStoreDynRef {
1450 fn as_hummock(&self) -> Option<&HummockStorage> {
1451 (*self.0).as_hummock()
1452 }
1453 }
1454
1455 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1456
1457 impl StateStore for StateStoreDynRef {
1458 type Local = BoxDynLocalStateStore;
1459 type ReadSnapshot = StateStoreReadSnapshotDynRef;
1460 type VectorWriter = BoxDynStateStoreWriteVector;
1461
1462 fn try_wait_epoch(
1463 &self,
1464 epoch: HummockReadEpoch,
1465 options: TryWaitEpochOptions,
1466 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1467 (*self.0).try_wait_epoch(epoch, options)
1468 }
1469
1470 fn new_local(
1471 &self,
1472 option: NewLocalOptions,
1473 ) -> impl Future<Output = Self::Local> + Send + '_ {
1474 (*self.0).new_local(option)
1475 }
1476
1477 async fn new_read_snapshot(
1478 &self,
1479 epoch: HummockReadEpoch,
1480 options: NewReadSnapshotOptions,
1481 ) -> StorageResult<Self::ReadSnapshot> {
1482 (*self.0).new_read_snapshot(epoch, options).await
1483 }
1484
1485 fn new_vector_writer(
1486 &self,
1487 options: NewVectorWriterOptions,
1488 ) -> impl Future<Output = Self::VectorWriter> + Send + '_ {
1489 (*self.0).new_vector_writer(options)
1490 }
1491 }
1492}