1use std::collections::HashSet;
16use std::fmt::Debug;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use enum_as_inner::EnumAsInner;
21use foyer::{
22 DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RateLimitPicker,
23};
24use futures::FutureExt;
25use futures::future::BoxFuture;
26use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
27use risingwave_common::catalog::TableId;
28use risingwave_common::license::Feature;
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common_service::RpcNotificationClient;
31use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
32use risingwave_object_store::object::build_remote_object_store;
33use thiserror_ext::AsReport;
34
35use crate::StateStore;
36use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
37use crate::error::StorageResult;
38use crate::hummock::hummock_meta_client::MonitoredHummockMetaClient;
39use crate::hummock::{
40 Block, BlockCacheEventListener, HummockError, HummockStorage, RecentFilter, Sstable,
41 SstableBlockIndex, SstableStore, SstableStoreConfig,
42};
43use crate::memory::MemoryStateStore;
44use crate::memory::sled::SledStateStore;
45use crate::monitor::{
46 CompactorMetrics, HummockStateStoreMetrics, MonitoredStateStore, MonitoredStorageMetrics,
47 ObjectStoreMetrics,
48};
49use crate::opts::StorageOpts;
50
51static FOYER_METRICS_REGISTRY: LazyLock<Box<PrometheusMetricsRegistry>> = LazyLock::new(|| {
52 Box::new(PrometheusMetricsRegistry::new(
53 GLOBAL_METRICS_REGISTRY.clone(),
54 ))
55});
56
57mod opaque_type {
58 use super::*;
59
60 pub type HummockStorageType = impl StateStore + AsHummock;
61 pub type MemoryStateStoreType = impl StateStore + AsHummock;
62 pub type SledStateStoreType = impl StateStore + AsHummock;
63
64 pub fn in_memory(state_store: MemoryStateStore) -> MemoryStateStoreType {
65 may_dynamic_dispatch(state_store)
66 }
67
68 pub fn hummock(state_store: HummockStorage) -> HummockStorageType {
69 may_dynamic_dispatch(may_verify(state_store))
70 }
71
72 pub fn sled(state_store: SledStateStore) -> SledStateStoreType {
73 may_dynamic_dispatch(state_store)
74 }
75}
76pub use opaque_type::{HummockStorageType, MemoryStateStoreType, SledStateStoreType};
77use opaque_type::{hummock, in_memory, sled};
78
79#[cfg(feature = "hm-trace")]
80type Monitored<S> = MonitoredStateStore<crate::monitor::traced_store::TracedStateStore<S>>;
81
82#[cfg(not(feature = "hm-trace"))]
83type Monitored<S> = MonitoredStateStore<S>;
84
85fn monitored<S: StateStore>(
86 state_store: S,
87 storage_metrics: Arc<MonitoredStorageMetrics>,
88) -> Monitored<S> {
89 let inner = {
90 #[cfg(feature = "hm-trace")]
91 {
92 crate::monitor::traced_store::TracedStateStore::new_global(state_store)
93 }
94 #[cfg(not(feature = "hm-trace"))]
95 {
96 state_store
97 }
98 };
99 inner.monitored(storage_metrics)
100}
101
102fn inner<S>(state_store: &Monitored<S>) -> &S {
103 let inner = state_store.inner();
104 {
105 #[cfg(feature = "hm-trace")]
106 {
107 inner.inner()
108 }
109 #[cfg(not(feature = "hm-trace"))]
110 {
111 inner
112 }
113 }
114}
115
116#[derive(Clone, EnumAsInner)]
118#[allow(clippy::enum_variant_names)]
119pub enum StateStoreImpl {
120 HummockStateStore(Monitored<HummockStorageType>),
129 MemoryStateStore(Monitored<MemoryStateStoreType>),
134 SledStateStore(Monitored<SledStateStoreType>),
135}
136
137fn may_dynamic_dispatch(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
138 #[cfg(not(debug_assertions))]
139 {
140 state_store
141 }
142 #[cfg(debug_assertions)]
143 {
144 use crate::store_impl::dyn_state_store::StateStorePointer;
145 StateStorePointer(Arc::new(state_store) as _)
146 }
147}
148
149fn may_verify(state_store: impl StateStore + AsHummock) -> impl StateStore + AsHummock {
150 #[cfg(not(debug_assertions))]
151 {
152 state_store
153 }
154 #[cfg(debug_assertions)]
155 {
156 use std::marker::PhantomData;
157
158 use risingwave_common::util::env_var::env_var_is_true;
159 use tracing::info;
160
161 use crate::store_impl::verify::VerifyStateStore;
162
163 let expected = if env_var_is_true("ENABLE_STATE_STORE_VERIFY") {
164 info!("enable verify state store");
165 Some(SledStateStore::new_temp())
166 } else {
167 info!("verify state store is not enabled");
168 None
169 };
170 VerifyStateStore {
171 actual: state_store,
172 expected,
173 _phantom: PhantomData::<()>,
174 }
175 }
176}
177
178impl StateStoreImpl {
179 fn in_memory(
180 state_store: MemoryStateStore,
181 storage_metrics: Arc<MonitoredStorageMetrics>,
182 ) -> Self {
183 Self::MemoryStateStore(monitored(in_memory(state_store), storage_metrics))
185 }
186
187 pub fn hummock(
188 state_store: HummockStorage,
189 storage_metrics: Arc<MonitoredStorageMetrics>,
190 ) -> Self {
191 Self::HummockStateStore(monitored(hummock(state_store), storage_metrics))
193 }
194
195 pub fn sled(
196 state_store: SledStateStore,
197 storage_metrics: Arc<MonitoredStorageMetrics>,
198 ) -> Self {
199 Self::SledStateStore(monitored(sled(state_store), storage_metrics))
200 }
201
202 pub fn shared_in_memory_store(storage_metrics: Arc<MonitoredStorageMetrics>) -> Self {
203 Self::in_memory(MemoryStateStore::shared(), storage_metrics)
204 }
205
206 pub fn for_test() -> Self {
207 Self::in_memory(
208 MemoryStateStore::new(),
209 Arc::new(MonitoredStorageMetrics::unused()),
210 )
211 }
212
213 pub fn as_hummock(&self) -> Option<&HummockStorage> {
214 match self {
215 StateStoreImpl::HummockStateStore(hummock) => {
216 Some(inner(hummock).as_hummock().expect("should be hummock"))
217 }
218 _ => None,
219 }
220 }
221}
222
223impl Debug for StateStoreImpl {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 match self {
226 StateStoreImpl::HummockStateStore(_) => write!(f, "HummockStateStore"),
227 StateStoreImpl::MemoryStateStore(_) => write!(f, "MemoryStateStore"),
228 StateStoreImpl::SledStateStore(_) => write!(f, "SledStateStore"),
229 }
230 }
231}
232
233#[macro_export]
234macro_rules! dispatch_state_store {
235 ($impl:expr, $store:ident, $body:tt) => {{
236 use $crate::store_impl::StateStoreImpl;
237
238 match $impl {
239 StateStoreImpl::MemoryStateStore($store) => {
240 #[cfg(debug_assertions)]
243 {
244 $body
245 }
246 #[cfg(not(debug_assertions))]
247 {
248 let _store = $store;
249 unimplemented!("memory state store should never be used in release mode");
250 }
251 }
252
253 StateStoreImpl::SledStateStore($store) => {
254 #[cfg(debug_assertions)]
257 {
258 $body
259 }
260 #[cfg(not(debug_assertions))]
261 {
262 let _store = $store;
263 unimplemented!("sled state store should never be used in release mode");
264 }
265 }
266
267 StateStoreImpl::HummockStateStore($store) => $body,
268 }
269 }};
270}
271
272#[cfg(any(debug_assertions, test, feature = "test"))]
273pub mod verify {
274 use std::fmt::Debug;
275 use std::future::Future;
276 use std::marker::PhantomData;
277 use std::ops::Deref;
278 use std::sync::Arc;
279
280 use bytes::Bytes;
281 use risingwave_common::bitmap::Bitmap;
282 use risingwave_common::hash::VirtualNode;
283 use risingwave_hummock_sdk::HummockReadEpoch;
284 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
285 use tracing::log::warn;
286
287 use crate::error::StorageResult;
288 use crate::hummock::HummockStorage;
289 use crate::store::*;
290 use crate::store_impl::AsHummock;
291
292 fn assert_result_eq<Item: PartialEq + Debug, E>(
293 first: &std::result::Result<Item, E>,
294 second: &std::result::Result<Item, E>,
295 ) {
296 match (first, second) {
297 (Ok(first), Ok(second)) => {
298 if first != second {
299 warn!("result different: {:?} {:?}", first, second);
300 }
301 assert_eq!(first, second);
302 }
303 (Err(_), Err(_)) => {}
304 _ => {
305 warn!("one success and one failed");
306 panic!("result not equal");
307 }
308 }
309 }
310
311 #[derive(Clone)]
312 pub struct VerifyStateStore<A, E, T = ()> {
313 pub actual: A,
314 pub expected: Option<E>,
315 pub _phantom: PhantomData<T>,
316 }
317
318 impl<A: AsHummock, E: AsHummock> AsHummock for VerifyStateStore<A, E> {
319 fn as_hummock(&self) -> Option<&HummockStorage> {
320 self.actual.as_hummock()
321 }
322 }
323
324 impl<A: StateStoreRead, E: StateStoreRead> StateStoreRead for VerifyStateStore<A, E> {
325 type Iter = impl StateStoreReadIter;
326 type RevIter = impl StateStoreReadIter;
327
328 async fn get_keyed_row(
329 &self,
330 key: TableKey<Bytes>,
331
332 read_options: ReadOptions,
333 ) -> StorageResult<Option<StateStoreKeyedRow>> {
334 let actual = self
335 .actual
336 .get_keyed_row(key.clone(), read_options.clone())
337 .await;
338 if let Some(expected) = &self.expected {
339 let expected = expected.get_keyed_row(key, read_options).await;
340 assert_result_eq(&actual, &expected);
341 }
342 actual
343 }
344
345 #[expect(clippy::manual_async_fn)]
348 fn iter(
349 &self,
350 key_range: TableKeyRange,
351
352 read_options: ReadOptions,
353 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
354 async move {
355 let actual = self
356 .actual
357 .iter(key_range.clone(), read_options.clone())
358 .await?;
359 let expected = if let Some(expected) = &self.expected {
360 Some(expected.iter(key_range, read_options).await?)
361 } else {
362 None
363 };
364
365 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
366 }
367 }
368
369 #[expect(clippy::manual_async_fn)]
370 fn rev_iter(
371 &self,
372 key_range: TableKeyRange,
373
374 read_options: ReadOptions,
375 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
376 async move {
377 let actual = self
378 .actual
379 .rev_iter(key_range.clone(), read_options.clone())
380 .await?;
381 let expected = if let Some(expected) = &self.expected {
382 Some(expected.rev_iter(key_range, read_options).await?)
383 } else {
384 None
385 };
386
387 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
388 }
389 }
390 }
391
392 impl<A: StateStoreReadLog, E: StateStoreReadLog> StateStoreReadLog for VerifyStateStore<A, E> {
393 type ChangeLogIter = impl StateStoreReadChangeLogIter;
394
395 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
396 let actual = self.actual.next_epoch(epoch, options.clone()).await?;
397 if let Some(expected) = &self.expected {
398 assert_eq!(actual, expected.next_epoch(epoch, options).await?);
399 }
400 Ok(actual)
401 }
402
403 async fn iter_log(
404 &self,
405 epoch_range: (u64, u64),
406 key_range: TableKeyRange,
407 options: ReadLogOptions,
408 ) -> StorageResult<Self::ChangeLogIter> {
409 let actual = self
410 .actual
411 .iter_log(epoch_range, key_range.clone(), options.clone())
412 .await?;
413 let expected = if let Some(expected) = &self.expected {
414 Some(expected.iter_log(epoch_range, key_range, options).await?)
415 } else {
416 None
417 };
418
419 Ok(verify_iter::<StateStoreReadLogItem>(actual, expected))
420 }
421 }
422
423 impl<A: StateStoreIter<T>, E: StateStoreIter<T>, T: IterItem> StateStoreIter<T>
424 for VerifyStateStore<A, E, T>
425 where
426 for<'a> T::ItemRef<'a>: PartialEq + Debug,
427 {
428 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
429 let actual = self.actual.try_next().await?;
430 if let Some(expected) = self.expected.as_mut() {
431 let expected = expected.try_next().await?;
432 assert_eq!(actual, expected);
433 }
434 Ok(actual)
435 }
436 }
437
438 fn verify_iter<T: IterItem>(
439 actual: impl StateStoreIter<T>,
440 expected: Option<impl StateStoreIter<T>>,
441 ) -> impl StateStoreIter<T>
442 where
443 for<'a> T::ItemRef<'a>: PartialEq + Debug,
444 {
445 VerifyStateStore {
446 actual,
447 expected,
448 _phantom: PhantomData::<T>,
449 }
450 }
451
452 impl<A: LocalStateStore, E: LocalStateStore> LocalStateStore for VerifyStateStore<A, E> {
453 type FlushedSnapshotReader =
454 VerifyStateStore<A::FlushedSnapshotReader, E::FlushedSnapshotReader>;
455
456 type Iter<'a> = impl StateStoreIter + 'a;
457 type RevIter<'a> = impl StateStoreIter + 'a;
458
459 async fn get(
460 &self,
461 key: TableKey<Bytes>,
462 read_options: ReadOptions,
463 ) -> StorageResult<Option<Bytes>> {
464 let actual = self.actual.get(key.clone(), read_options.clone()).await;
465 if let Some(expected) = &self.expected {
466 let expected = expected.get(key, read_options).await;
467 assert_result_eq(&actual, &expected);
468 }
469 actual
470 }
471
472 #[expect(clippy::manual_async_fn)]
473 fn iter(
474 &self,
475 key_range: TableKeyRange,
476 read_options: ReadOptions,
477 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
478 async move {
479 let actual = self
480 .actual
481 .iter(key_range.clone(), read_options.clone())
482 .await?;
483 let expected = if let Some(expected) = &self.expected {
484 Some(expected.iter(key_range, read_options).await?)
485 } else {
486 None
487 };
488
489 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
490 }
491 }
492
493 #[expect(clippy::manual_async_fn)]
494 fn rev_iter(
495 &self,
496 key_range: TableKeyRange,
497 read_options: ReadOptions,
498 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
499 async move {
500 let actual = self
501 .actual
502 .rev_iter(key_range.clone(), read_options.clone())
503 .await?;
504 let expected = if let Some(expected) = &self.expected {
505 Some(expected.rev_iter(key_range, read_options).await?)
506 } else {
507 None
508 };
509
510 Ok(verify_iter::<StateStoreKeyedRow>(actual, expected))
511 }
512 }
513
514 fn insert(
515 &mut self,
516 key: TableKey<Bytes>,
517 new_val: Bytes,
518 old_val: Option<Bytes>,
519 ) -> StorageResult<()> {
520 if let Some(expected) = &mut self.expected {
521 expected.insert(key.clone(), new_val.clone(), old_val.clone())?;
522 }
523 self.actual.insert(key, new_val, old_val)?;
524
525 Ok(())
526 }
527
528 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
529 if let Some(expected) = &mut self.expected {
530 expected.delete(key.clone(), old_val.clone())?;
531 }
532 self.actual.delete(key, old_val)?;
533 Ok(())
534 }
535
536 async fn flush(&mut self) -> StorageResult<usize> {
537 if let Some(expected) = &mut self.expected {
538 expected.flush().await?;
539 }
540 self.actual.flush().await
541 }
542
543 async fn try_flush(&mut self) -> StorageResult<()> {
544 if let Some(expected) = &mut self.expected {
545 expected.try_flush().await?;
546 }
547 self.actual.try_flush().await
548 }
549
550 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
551 self.actual.init(options.clone()).await?;
552 if let Some(expected) = &mut self.expected {
553 expected.init(options).await?;
554 }
555 Ok(())
556 }
557
558 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
559 if let Some(expected) = &mut self.expected {
560 expected.seal_current_epoch(next_epoch, opts.clone());
561 }
562 self.actual.seal_current_epoch(next_epoch, opts);
563 }
564
565 fn epoch(&self) -> u64 {
566 let epoch = self.actual.epoch();
567 if let Some(expected) = &self.expected {
568 assert_eq!(epoch, expected.epoch());
569 }
570 epoch
571 }
572
573 fn is_dirty(&self) -> bool {
574 let ret = self.actual.is_dirty();
575 if let Some(expected) = &self.expected {
576 assert_eq!(ret, expected.is_dirty());
577 }
578 ret
579 }
580
581 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
582 let ret = self.actual.update_vnode_bitmap(vnodes.clone()).await?;
583 if let Some(expected) = &mut self.expected {
584 assert_eq!(ret, expected.update_vnode_bitmap(vnodes).await?);
585 }
586 Ok(ret)
587 }
588
589 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
590 let ret = self.actual.get_table_watermark(vnode);
591 if let Some(expected) = &self.expected {
592 assert_eq!(ret, expected.get_table_watermark(vnode));
593 }
594 ret
595 }
596
597 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
598 VerifyStateStore {
599 actual: self.actual.new_flushed_snapshot_reader(),
600 expected: self.expected.as_ref().map(E::new_flushed_snapshot_reader),
601 _phantom: Default::default(),
602 }
603 }
604 }
605
606 impl<A: StateStore, E: StateStore> StateStore for VerifyStateStore<A, E> {
607 type Local = VerifyStateStore<A::Local, E::Local>;
608 type ReadSnapshot = VerifyStateStore<A::ReadSnapshot, E::ReadSnapshot>;
609
610 fn try_wait_epoch(
611 &self,
612 epoch: HummockReadEpoch,
613 options: TryWaitEpochOptions,
614 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
615 self.actual.try_wait_epoch(epoch, options)
616 }
617
618 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
619 let expected = if let Some(expected) = &self.expected {
620 Some(expected.new_local(option.clone()).await)
621 } else {
622 None
623 };
624 VerifyStateStore {
625 actual: self.actual.new_local(option).await,
626 expected,
627 _phantom: PhantomData::<()>,
628 }
629 }
630
631 async fn new_read_snapshot(
632 &self,
633 epoch: HummockReadEpoch,
634 options: NewReadSnapshotOptions,
635 ) -> StorageResult<Self::ReadSnapshot> {
636 let expected = if let Some(expected) = &self.expected {
637 Some(expected.new_read_snapshot(epoch, options).await?)
638 } else {
639 None
640 };
641 Ok(VerifyStateStore {
642 actual: self.actual.new_read_snapshot(epoch, options).await?,
643 expected,
644 _phantom: PhantomData::<()>,
645 })
646 }
647 }
648
649 impl<A, E> Deref for VerifyStateStore<A, E> {
650 type Target = A;
651
652 fn deref(&self) -> &Self::Target {
653 &self.actual
654 }
655 }
656}
657
658impl StateStoreImpl {
659 #[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
660 #[allow(clippy::too_many_arguments)]
661 #[expect(clippy::borrowed_box)]
662 pub async fn new(
663 s: &str,
664 opts: Arc<StorageOpts>,
665 hummock_meta_client: Arc<MonitoredHummockMetaClient>,
666 state_store_metrics: Arc<HummockStateStoreMetrics>,
667 object_store_metrics: Arc<ObjectStoreMetrics>,
668 storage_metrics: Arc<MonitoredStorageMetrics>,
669 compactor_metrics: Arc<CompactorMetrics>,
670 await_tree_config: Option<await_tree::Config>,
671 use_new_object_prefix_strategy: bool,
672 ) -> StorageResult<Self> {
673 const KB: usize = 1 << 10;
674 const MB: usize = 1 << 20;
675
676 let meta_cache = {
677 let mut builder = HybridCacheBuilder::new()
678 .with_name("foyer.meta")
679 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
680 .memory(opts.meta_cache_capacity_mb * MB)
681 .with_shards(opts.meta_cache_shard_num)
682 .with_eviction_config(opts.meta_cache_eviction_config.clone())
683 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
684 u64::BITS as usize / 8 + value.estimate_size()
685 })
686 .storage(Engine::Large);
687
688 if !opts.meta_file_cache_dir.is_empty() {
689 if let Err(e) = Feature::ElasticDiskCache.check_available() {
690 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
691 } else {
692 builder = builder
693 .with_device_options(
694 DirectFsDeviceOptions::new(&opts.meta_file_cache_dir)
695 .with_capacity(opts.meta_file_cache_capacity_mb * MB)
696 .with_file_size(opts.meta_file_cache_file_capacity_mb * MB),
697 )
698 .with_recover_mode(opts.meta_file_cache_recover_mode)
699 .with_compression(opts.meta_file_cache_compression)
700 .with_runtime_options(opts.meta_file_cache_runtime_config.clone())
701 .with_large_object_disk_cache_options(
702 LargeEngineOptions::new()
703 .with_indexer_shards(opts.meta_file_cache_indexer_shards)
704 .with_flushers(opts.meta_file_cache_flushers)
705 .with_reclaimers(opts.meta_file_cache_reclaimers)
706 .with_buffer_pool_size(
707 opts.meta_file_cache_flush_buffer_threshold_mb * MB,
708 ) .with_clean_region_threshold(
710 opts.meta_file_cache_reclaimers
711 + opts.meta_file_cache_reclaimers / 2,
712 )
713 .with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
714 .with_blob_index_size(16 * KB),
715 );
716 if opts.meta_file_cache_insert_rate_limit_mb > 0 {
717 builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(
718 opts.meta_file_cache_insert_rate_limit_mb * MB,
719 )));
720 }
721 }
722 }
723
724 builder.build().await.map_err(HummockError::foyer_error)?
725 };
726
727 let block_cache = {
728 let mut builder = HybridCacheBuilder::new()
729 .with_name("foyer.data")
730 .with_metrics_registry(FOYER_METRICS_REGISTRY.clone())
731 .with_event_listener(Arc::new(BlockCacheEventListener::new(
732 state_store_metrics.clone(),
733 )))
734 .memory(opts.block_cache_capacity_mb * MB)
735 .with_shards(opts.block_cache_shard_num)
736 .with_eviction_config(opts.block_cache_eviction_config.clone())
737 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
738 u64::BITS as usize * 2 / 8 + value.raw().len()
740 })
741 .storage(Engine::Large);
742
743 if !opts.data_file_cache_dir.is_empty() {
744 if let Err(e) = Feature::ElasticDiskCache.check_available() {
745 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
746 } else {
747 builder = builder
748 .with_device_options(
749 DirectFsDeviceOptions::new(&opts.data_file_cache_dir)
750 .with_capacity(opts.data_file_cache_capacity_mb * MB)
751 .with_file_size(opts.data_file_cache_file_capacity_mb * MB),
752 )
753 .with_recover_mode(opts.data_file_cache_recover_mode)
754 .with_compression(opts.data_file_cache_compression)
755 .with_runtime_options(opts.data_file_cache_runtime_config.clone())
756 .with_large_object_disk_cache_options(
757 LargeEngineOptions::new()
758 .with_indexer_shards(opts.data_file_cache_indexer_shards)
759 .with_flushers(opts.data_file_cache_flushers)
760 .with_reclaimers(opts.data_file_cache_reclaimers)
761 .with_buffer_pool_size(
762 opts.data_file_cache_flush_buffer_threshold_mb * MB,
763 ) .with_clean_region_threshold(
765 opts.data_file_cache_reclaimers
766 + opts.data_file_cache_reclaimers / 2,
767 )
768 .with_recover_concurrency(opts.data_file_cache_recover_concurrency)
769 .with_blob_index_size(16 * KB),
770 );
771 if opts.data_file_cache_insert_rate_limit_mb > 0 {
772 builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(
773 opts.data_file_cache_insert_rate_limit_mb * MB,
774 )));
775 }
776 }
777 }
778
779 builder.build().await.map_err(HummockError::foyer_error)?
780 };
781
782 let recent_filter = if opts.data_file_cache_dir.is_empty() {
783 None
784 } else {
785 Some(Arc::new(RecentFilter::new(
786 opts.cache_refill_recent_filter_layers,
787 Duration::from_millis(opts.cache_refill_recent_filter_rotate_interval_ms as u64),
788 )))
789 };
790
791 let store = match s {
792 hummock if hummock.starts_with("hummock+") => {
793 let object_store = build_remote_object_store(
794 hummock.strip_prefix("hummock+").unwrap(),
795 object_store_metrics.clone(),
796 "Hummock",
797 Arc::new(opts.object_store_config.clone()),
798 )
799 .await;
800
801 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
802 store: Arc::new(object_store),
803 path: opts.data_directory.clone(),
804 prefetch_buffer_capacity: opts.prefetch_buffer_capacity_mb * (1 << 20),
805 max_prefetch_block_number: opts.max_prefetch_block_number,
806 recent_filter,
807 state_store_metrics: state_store_metrics.clone(),
808 use_new_object_prefix_strategy,
809
810 meta_cache,
811 block_cache,
812 }));
813 let notification_client =
814 RpcNotificationClient::new(hummock_meta_client.get_inner().clone());
815 let compaction_catalog_manager_ref =
816 Arc::new(CompactionCatalogManager::new(Box::new(
817 RemoteTableAccessor::new(hummock_meta_client.get_inner().clone()),
818 )));
819
820 let inner = HummockStorage::new(
821 opts.clone(),
822 sstable_store,
823 hummock_meta_client.clone(),
824 notification_client,
825 compaction_catalog_manager_ref,
826 state_store_metrics.clone(),
827 compactor_metrics.clone(),
828 await_tree_config,
829 )
830 .await?;
831
832 StateStoreImpl::hummock(inner, storage_metrics)
833 }
834
835 "in_memory" | "in-memory" => {
836 tracing::warn!(
837 "In-memory state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
838 );
839 StateStoreImpl::shared_in_memory_store(storage_metrics.clone())
840 }
841
842 sled if sled.starts_with("sled://") => {
843 tracing::warn!(
844 "sled state store should never be used in end-to-end benchmarks or production environment. Scaling and recovery are not supported."
845 );
846 let path = sled.strip_prefix("sled://").unwrap();
847 StateStoreImpl::sled(SledStateStore::new(path), storage_metrics.clone())
848 }
849
850 other => unimplemented!("{} state store is not supported", other),
851 };
852
853 Ok(store)
854 }
855}
856
857pub trait AsHummock: Send + Sync {
858 fn as_hummock(&self) -> Option<&HummockStorage>;
859
860 fn sync(
861 &self,
862 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
863 ) -> BoxFuture<'_, StorageResult<SyncResult>> {
864 async move {
865 if let Some(hummock) = self.as_hummock() {
866 hummock.sync(sync_table_epochs).await
867 } else {
868 Ok(SyncResult::default())
869 }
870 }
871 .boxed()
872 }
873}
874
875impl AsHummock for HummockStorage {
876 fn as_hummock(&self) -> Option<&HummockStorage> {
877 Some(self)
878 }
879}
880
881impl AsHummock for MemoryStateStore {
882 fn as_hummock(&self) -> Option<&HummockStorage> {
883 None
884 }
885}
886
887impl AsHummock for SledStateStore {
888 fn as_hummock(&self) -> Option<&HummockStorage> {
889 None
890 }
891}
892
893#[cfg(debug_assertions)]
894mod dyn_state_store {
895 use std::future::Future;
896 use std::ops::DerefMut;
897 use std::sync::Arc;
898
899 use bytes::Bytes;
900 use risingwave_common::bitmap::Bitmap;
901 use risingwave_common::hash::VirtualNode;
902 use risingwave_hummock_sdk::HummockReadEpoch;
903 use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
904
905 use crate::error::StorageResult;
906 use crate::hummock::HummockStorage;
907 use crate::store::*;
908 use crate::store_impl::AsHummock;
909
910 #[async_trait::async_trait]
911 pub trait DynStateStoreIter<T: IterItem>: Send {
912 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>>;
913 }
914
915 #[async_trait::async_trait]
916 impl<T: IterItem, I: StateStoreIter<T>> DynStateStoreIter<T> for I {
917 async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
918 self.try_next().await
919 }
920 }
921
922 pub type BoxStateStoreIter<'a, T> = Box<dyn DynStateStoreIter<T> + 'a>;
923 impl<T: IterItem> StateStoreIter<T> for BoxStateStoreIter<'_, T> {
924 fn try_next(
925 &mut self,
926 ) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_ {
927 self.deref_mut().try_next()
928 }
929 }
930
931 pub type BoxStateStoreReadIter = BoxStateStoreIter<'static, StateStoreKeyedRow>;
934 pub type BoxStateStoreReadChangeLogIter = BoxStateStoreIter<'static, StateStoreReadLogItem>;
935
936 #[async_trait::async_trait]
937 pub trait DynStateStoreRead: StaticSendSync {
938 async fn get_keyed_row(
939 &self,
940 key: TableKey<Bytes>,
941
942 read_options: ReadOptions,
943 ) -> StorageResult<Option<StateStoreKeyedRow>>;
944
945 async fn iter(
946 &self,
947 key_range: TableKeyRange,
948
949 read_options: ReadOptions,
950 ) -> StorageResult<BoxStateStoreReadIter>;
951
952 async fn rev_iter(
953 &self,
954 key_range: TableKeyRange,
955
956 read_options: ReadOptions,
957 ) -> StorageResult<BoxStateStoreReadIter>;
958 }
959
960 #[async_trait::async_trait]
961 pub trait DynStateStoreReadLog: StaticSendSync {
962 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64>;
963 async fn iter_log(
964 &self,
965 epoch_range: (u64, u64),
966 key_range: TableKeyRange,
967 options: ReadLogOptions,
968 ) -> StorageResult<BoxStateStoreReadChangeLogIter>;
969 }
970
971 pub type StateStoreReadDynRef = StateStorePointer<Arc<dyn DynStateStoreRead>>;
972
973 #[async_trait::async_trait]
974 impl<S: StateStoreRead> DynStateStoreRead for S {
975 async fn get_keyed_row(
976 &self,
977 key: TableKey<Bytes>,
978
979 read_options: ReadOptions,
980 ) -> StorageResult<Option<StateStoreKeyedRow>> {
981 self.get_keyed_row(key, read_options).await
982 }
983
984 async fn iter(
985 &self,
986 key_range: TableKeyRange,
987
988 read_options: ReadOptions,
989 ) -> StorageResult<BoxStateStoreReadIter> {
990 Ok(Box::new(self.iter(key_range, read_options).await?))
991 }
992
993 async fn rev_iter(
994 &self,
995 key_range: TableKeyRange,
996
997 read_options: ReadOptions,
998 ) -> StorageResult<BoxStateStoreReadIter> {
999 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1000 }
1001 }
1002
1003 #[async_trait::async_trait]
1004 impl<S: StateStoreReadLog> DynStateStoreReadLog for S {
1005 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1006 self.next_epoch(epoch, options).await
1007 }
1008
1009 async fn iter_log(
1010 &self,
1011 epoch_range: (u64, u64),
1012 key_range: TableKeyRange,
1013 options: ReadLogOptions,
1014 ) -> StorageResult<BoxStateStoreReadChangeLogIter> {
1015 Ok(Box::new(
1016 self.iter_log(epoch_range, key_range, options).await?,
1017 ))
1018 }
1019 }
1020
1021 pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a, StateStoreKeyedRow>;
1023 #[async_trait::async_trait]
1024 pub trait DynLocalStateStore: StaticSendSync {
1025 async fn get(
1026 &self,
1027 key: TableKey<Bytes>,
1028 read_options: ReadOptions,
1029 ) -> StorageResult<Option<Bytes>>;
1030
1031 async fn iter(
1032 &self,
1033 key_range: TableKeyRange,
1034 read_options: ReadOptions,
1035 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1036
1037 async fn rev_iter(
1038 &self,
1039 key_range: TableKeyRange,
1040 read_options: ReadOptions,
1041 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>>;
1042
1043 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef;
1044
1045 fn insert(
1046 &mut self,
1047 key: TableKey<Bytes>,
1048 new_val: Bytes,
1049 old_val: Option<Bytes>,
1050 ) -> StorageResult<()>;
1051
1052 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
1053
1054 async fn flush(&mut self) -> StorageResult<usize>;
1055
1056 async fn try_flush(&mut self) -> StorageResult<()>;
1057
1058 fn epoch(&self) -> u64;
1059
1060 fn is_dirty(&self) -> bool;
1061
1062 async fn init(&mut self, epoch: InitOptions) -> StorageResult<()>;
1063
1064 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
1065
1066 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>>;
1067
1068 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
1069 }
1070
1071 #[async_trait::async_trait]
1072 impl<S: LocalStateStore> DynLocalStateStore for S {
1073 async fn get(
1074 &self,
1075 key: TableKey<Bytes>,
1076 read_options: ReadOptions,
1077 ) -> StorageResult<Option<Bytes>> {
1078 self.get(key, read_options).await
1079 }
1080
1081 async fn iter(
1082 &self,
1083 key_range: TableKeyRange,
1084 read_options: ReadOptions,
1085 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1086 Ok(Box::new(self.iter(key_range, read_options).await?))
1087 }
1088
1089 async fn rev_iter(
1090 &self,
1091 key_range: TableKeyRange,
1092 read_options: ReadOptions,
1093 ) -> StorageResult<BoxLocalStateStoreIterStream<'_>> {
1094 Ok(Box::new(self.rev_iter(key_range, read_options).await?))
1095 }
1096
1097 fn new_flushed_snapshot_reader(&self) -> StateStoreReadDynRef {
1098 StateStorePointer(Arc::new(self.new_flushed_snapshot_reader()) as _)
1099 }
1100
1101 fn insert(
1102 &mut self,
1103 key: TableKey<Bytes>,
1104 new_val: Bytes,
1105 old_val: Option<Bytes>,
1106 ) -> StorageResult<()> {
1107 self.insert(key, new_val, old_val)
1108 }
1109
1110 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1111 self.delete(key, old_val)
1112 }
1113
1114 async fn flush(&mut self) -> StorageResult<usize> {
1115 self.flush().await
1116 }
1117
1118 async fn try_flush(&mut self) -> StorageResult<()> {
1119 self.try_flush().await
1120 }
1121
1122 fn epoch(&self) -> u64 {
1123 self.epoch()
1124 }
1125
1126 fn is_dirty(&self) -> bool {
1127 self.is_dirty()
1128 }
1129
1130 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1131 self.init(options).await
1132 }
1133
1134 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1135 self.seal_current_epoch(next_epoch, opts)
1136 }
1137
1138 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1139 self.update_vnode_bitmap(vnodes).await
1140 }
1141
1142 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1143 self.get_table_watermark(vnode)
1144 }
1145 }
1146
1147 pub type BoxDynLocalStateStore = StateStorePointer<Box<dyn DynLocalStateStore>>;
1148
1149 impl LocalStateStore for BoxDynLocalStateStore {
1150 type FlushedSnapshotReader = StateStoreReadDynRef;
1151 type Iter<'a> = BoxLocalStateStoreIterStream<'a>;
1152 type RevIter<'a> = BoxLocalStateStoreIterStream<'a>;
1153
1154 fn get(
1155 &self,
1156 key: TableKey<Bytes>,
1157 read_options: ReadOptions,
1158 ) -> impl Future<Output = StorageResult<Option<Bytes>>> + Send + '_ {
1159 (*self.0).get(key, read_options)
1160 }
1161
1162 fn iter(
1163 &self,
1164 key_range: TableKeyRange,
1165 read_options: ReadOptions,
1166 ) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_ {
1167 (*self.0).iter(key_range, read_options)
1168 }
1169
1170 fn rev_iter(
1171 &self,
1172 key_range: TableKeyRange,
1173 read_options: ReadOptions,
1174 ) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_ {
1175 (*self.0).rev_iter(key_range, read_options)
1176 }
1177
1178 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1179 (*self.0).new_flushed_snapshot_reader()
1180 }
1181
1182 fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes> {
1183 (*self.0).get_table_watermark(vnode)
1184 }
1185
1186 fn insert(
1187 &mut self,
1188 key: TableKey<Bytes>,
1189 new_val: Bytes,
1190 old_val: Option<Bytes>,
1191 ) -> StorageResult<()> {
1192 (*self.0).insert(key, new_val, old_val)
1193 }
1194
1195 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1196 (*self.0).delete(key, old_val)
1197 }
1198
1199 fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
1200 (*self.0).flush()
1201 }
1202
1203 fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1204 (*self.0).try_flush()
1205 }
1206
1207 fn epoch(&self) -> u64 {
1208 (*self.0).epoch()
1209 }
1210
1211 fn is_dirty(&self) -> bool {
1212 (*self.0).is_dirty()
1213 }
1214
1215 fn init(
1216 &mut self,
1217 options: InitOptions,
1218 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1219 (*self.0).init(options)
1220 }
1221
1222 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1223 (*self.0).seal_current_epoch(next_epoch, opts)
1224 }
1225
1226 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1227 (*self.0).update_vnode_bitmap(vnodes).await
1228 }
1229 }
1230
1231 #[async_trait::async_trait]
1234 pub trait DynStateStoreExt: StaticSendSync {
1235 async fn try_wait_epoch(
1236 &self,
1237 epoch: HummockReadEpoch,
1238 options: TryWaitEpochOptions,
1239 ) -> StorageResult<()>;
1240
1241 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore;
1242 async fn new_read_snapshot(
1243 &self,
1244 epoch: HummockReadEpoch,
1245 options: NewReadSnapshotOptions,
1246 ) -> StorageResult<StateStoreReadDynRef>;
1247 }
1248
1249 #[async_trait::async_trait]
1250 impl<S: StateStore> DynStateStoreExt for S {
1251 async fn try_wait_epoch(
1252 &self,
1253 epoch: HummockReadEpoch,
1254 options: TryWaitEpochOptions,
1255 ) -> StorageResult<()> {
1256 self.try_wait_epoch(epoch, options).await
1257 }
1258
1259 async fn new_local(&self, option: NewLocalOptions) -> BoxDynLocalStateStore {
1260 StateStorePointer(Box::new(self.new_local(option).await))
1261 }
1262
1263 async fn new_read_snapshot(
1264 &self,
1265 epoch: HummockReadEpoch,
1266 options: NewReadSnapshotOptions,
1267 ) -> StorageResult<StateStoreReadDynRef> {
1268 Ok(StateStorePointer(Arc::new(
1269 self.new_read_snapshot(epoch, options).await?,
1270 )))
1271 }
1272 }
1273
1274 pub type StateStoreDynRef = StateStorePointer<Arc<dyn DynStateStore>>;
1275
1276 macro_rules! state_store_pointer_dyn_as_ref {
1277 ($pointer:ident < dyn $source_dyn_trait:ident > , $target_dyn_trait:ident) => {
1278 impl AsRef<dyn $target_dyn_trait>
1279 for StateStorePointer<$pointer<dyn $source_dyn_trait>>
1280 {
1281 fn as_ref(&self) -> &dyn $target_dyn_trait {
1282 (&*self.0) as _
1283 }
1284 }
1285 };
1286 }
1287
1288 state_store_pointer_dyn_as_ref!(Arc<dyn DynStateStoreRead>, DynStateStoreRead);
1289
1290 #[derive(Clone)]
1291 pub struct StateStorePointer<P>(pub(crate) P);
1292
1293 impl<P> StateStoreRead for StateStorePointer<P>
1294 where
1295 StateStorePointer<P>: AsRef<dyn DynStateStoreRead> + StaticSendSync,
1296 {
1297 type Iter = BoxStateStoreReadIter;
1298 type RevIter = BoxStateStoreReadIter;
1299
1300 fn get_keyed_row(
1301 &self,
1302 key: TableKey<Bytes>,
1303
1304 read_options: ReadOptions,
1305 ) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + Send + '_ {
1306 self.as_ref().get_keyed_row(key, read_options)
1307 }
1308
1309 fn iter(
1310 &self,
1311 key_range: TableKeyRange,
1312
1313 read_options: ReadOptions,
1314 ) -> impl Future<Output = StorageResult<Self::Iter>> + '_ {
1315 self.as_ref().iter(key_range, read_options)
1316 }
1317
1318 fn rev_iter(
1319 &self,
1320 key_range: TableKeyRange,
1321
1322 read_options: ReadOptions,
1323 ) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
1324 self.as_ref().rev_iter(key_range, read_options)
1325 }
1326 }
1327
1328 impl StateStoreReadLog for StateStoreDynRef {
1329 type ChangeLogIter = BoxStateStoreReadChangeLogIter;
1330
1331 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
1332 (*self.0).next_epoch(epoch, options).await
1333 }
1334
1335 fn iter_log(
1336 &self,
1337 epoch_range: (u64, u64),
1338 key_range: TableKeyRange,
1339 options: ReadLogOptions,
1340 ) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_ {
1341 (*self.0).iter_log(epoch_range, key_range, options)
1342 }
1343 }
1344
1345 pub trait DynStateStore: DynStateStoreReadLog + DynStateStoreExt + AsHummock {}
1346
1347 impl AsHummock for StateStoreDynRef {
1348 fn as_hummock(&self) -> Option<&HummockStorage> {
1349 (*self.0).as_hummock()
1350 }
1351 }
1352
1353 impl<S: DynStateStoreReadLog + DynStateStoreExt + AsHummock> DynStateStore for S {}
1354
1355 impl StateStore for StateStoreDynRef {
1356 type Local = BoxDynLocalStateStore;
1357 type ReadSnapshot = StateStoreReadDynRef;
1358
1359 fn try_wait_epoch(
1360 &self,
1361 epoch: HummockReadEpoch,
1362 options: TryWaitEpochOptions,
1363 ) -> impl Future<Output = StorageResult<()>> + Send + '_ {
1364 (*self.0).try_wait_epoch(epoch, options)
1365 }
1366
1367 fn new_local(
1368 &self,
1369 option: NewLocalOptions,
1370 ) -> impl Future<Output = Self::Local> + Send + '_ {
1371 (*self.0).new_local(option)
1372 }
1373
1374 async fn new_read_snapshot(
1375 &self,
1376 epoch: HummockReadEpoch,
1377 options: NewReadSnapshotOptions,
1378 ) -> StorageResult<Self::ReadSnapshot> {
1379 (*self.0).new_read_snapshot(epoch, options).await
1380 }
1381 }
1382}