1use std::clone::Clone;
16use std::collections::VecDeque;
17use std::future::Future;
18use std::ops::Deref;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22
23use await_tree::{InstrumentAwait, SpanExt};
24use bytes::Bytes;
25use fail::fail_point;
26use foyer::{
27 Cache, CacheBuilder, CacheEntry, EventListener, Hint, HybridCache, HybridCacheBuilder,
28 HybridCacheEntry, HybridCacheProperties,
29};
30use futures::{FutureExt, StreamExt, future};
31use prost::Message;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
34use risingwave_hummock_sdk::{
35 HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
36 HummockVectorFileId, SST_OBJECT_SUFFIX,
37};
38use risingwave_hummock_trace::TracedCachePolicy;
39use risingwave_object_store::object::{
40 ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
41};
42use risingwave_pb::hummock::PbHnswGraph;
43use serde::{Deserialize, Serialize};
44use thiserror_ext::AsReport;
45use tokio::time::Instant;
46
47use super::{
48 BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
49 SstableWriterOptions,
50};
51use crate::hummock::block_stream::{
52 BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
53};
54use crate::hummock::none::NoneRecentFilter;
55use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
56use crate::hummock::vector::monitor::VectorStoreCacheStats;
57use crate::hummock::{BlockEntry, BlockHolder, HummockError, HummockResult, RecentFilterTrait};
58use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
59
60macro_rules! impl_vector_index_meta_file {
61 ($($type_name:ident),+) => {
62 pub enum HummockVectorIndexMetaFile {
63 $(
64 $type_name(Pin<Box<$type_name>>),
65 )+
66 }
67
68 $(
69 impl From<$type_name> for HummockVectorIndexMetaFile {
70 fn from(v: $type_name) -> Self {
71 Self::$type_name(Box::pin(v))
72 }
73 }
74
75 unsafe impl Send for VectorMetaFileHolder<$type_name> {}
76
77 impl VectorMetaFileHolder<$type_name> {
78 fn try_from_entry(
79 entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
80 object_id: HummockRawObjectId
81 ) -> HummockResult<Self> {
82 let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
83 return Err(HummockError::decode_error(format!(
84 "expect {} for object {}",
85 stringify!($type_name),
86 object_id
87 )));
88 };
89 let ptr = file_meta.as_ref().get_ref() as *const _;
90 Ok(VectorMetaFileHolder {
91 _cache_entry: entry,
92 ptr,
93 })
94 }
95 }
96 )+
97 };
98}
99
100impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
101
102pub struct VectorMetaFileHolder<T> {
103 _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
104 ptr: *const T,
105}
106
107impl<T> Deref for VectorMetaFileHolder<T> {
108 type Target = T;
109
110 fn deref(&self) -> &Self::Target {
111 unsafe { &*self.ptr }
113 }
114}
115
116pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
117
118pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
119
120pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
121pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
122
123#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
124pub struct SstableBlockIndex {
125 pub sst_id: HummockSstableObjectId,
126 pub block_idx: u64,
127}
128
129pub struct BlockCacheEventListener {
130 metrics: Arc<HummockStateStoreMetrics>,
131}
132
133impl BlockCacheEventListener {
134 pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
135 Self { metrics }
136 }
137}
138
139impl EventListener for BlockCacheEventListener {
140 type Key = SstableBlockIndex;
141 type Value = Box<Block>;
142
143 fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
144 where
145 Self::Key: foyer::Key,
146 Self::Value: foyer::Value,
147 {
148 self.metrics
149 .block_efficiency_histogram
150 .observe(value.efficiency());
151 }
152}
153
154#[derive(Clone, Copy, Eq, PartialEq)]
156pub enum CachePolicy {
157 Disable,
159 Fill(Hint),
161 NotFill,
163}
164
165impl Default for CachePolicy {
166 fn default() -> Self {
167 CachePolicy::Fill(Hint::Normal)
168 }
169}
170
171impl From<TracedCachePolicy> for CachePolicy {
172 fn from(policy: TracedCachePolicy) -> Self {
173 match policy {
174 TracedCachePolicy::Disable => Self::Disable,
175 TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
176 TracedCachePolicy::NotFill => Self::NotFill,
177 }
178 }
179}
180
181impl From<CachePolicy> for TracedCachePolicy {
182 fn from(policy: CachePolicy) -> Self {
183 match policy {
184 CachePolicy::Disable => Self::Disable,
185 CachePolicy::Fill(priority) => Self::Fill(priority.into()),
186 CachePolicy::NotFill => Self::NotFill,
187 }
188 }
189}
190
191pub struct SstableStoreConfig {
192 pub store: ObjectStoreRef,
193 pub path: String,
194
195 pub prefetch_buffer_capacity: usize,
196 pub max_prefetch_block_number: usize,
197 pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
198 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
199 pub use_new_object_prefix_strategy: bool,
200 pub skip_bloom_filter_in_serde: bool,
201
202 pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
203 pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
204
205 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
206 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
207}
208
209pub struct SstableStore {
210 path: String,
211 store: ObjectStoreRef,
212
213 meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
214 block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
215 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
216 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
217
218 recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
222 prefetch_buffer_usage: Arc<AtomicUsize>,
223 prefetch_buffer_capacity: usize,
224 max_prefetch_block_number: usize,
225 use_new_object_prefix_strategy: bool,
234
235 skip_bloom_filter_in_serde: bool,
239}
240
241impl SstableStore {
242 pub fn new(config: SstableStoreConfig) -> Self {
243 Self {
247 path: config.path,
248 store: config.store,
249
250 meta_cache: config.meta_cache,
251 block_cache: config.block_cache,
252 vector_meta_cache: config.vector_meta_cache,
253 vector_block_cache: config.vector_block_cache,
254
255 recent_filter: config.recent_filter,
256 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
257 prefetch_buffer_capacity: config.prefetch_buffer_capacity,
258 max_prefetch_block_number: config.max_prefetch_block_number,
259 use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
260 skip_bloom_filter_in_serde: config.skip_bloom_filter_in_serde,
261 }
262 }
263
264 #[expect(clippy::borrowed_box)]
267 pub async fn for_compactor(
268 store: ObjectStoreRef,
269 path: String,
270 block_cache_capacity: usize,
271 meta_cache_capacity: usize,
272 use_new_object_prefix_strategy: bool,
273 ) -> HummockResult<Self> {
274 let meta_cache = HybridCacheBuilder::new()
275 .memory(meta_cache_capacity)
276 .with_shards(1)
277 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
278 u64::BITS as usize / 8 + value.estimate_size()
279 })
280 .storage()
281 .build()
282 .await
283 .map_err(HummockError::foyer_error)?;
284
285 let block_cache = HybridCacheBuilder::new()
286 .memory(block_cache_capacity)
287 .with_shards(1)
288 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
289 u64::BITS as usize * 2 / 8 + value.raw().len()
291 })
292 .storage()
293 .build()
294 .await
295 .map_err(HummockError::foyer_error)?;
296
297 Ok(Self {
298 path,
299 store,
300
301 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
302 prefetch_buffer_capacity: block_cache_capacity,
303 max_prefetch_block_number: 16, recent_filter: Arc::new(NoneRecentFilter::default().into()),
305 use_new_object_prefix_strategy,
306 skip_bloom_filter_in_serde: false,
307
308 meta_cache,
309 block_cache,
310 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
311 vector_block_cache: CacheBuilder::new(1 << 10).build(),
312 })
313 }
314
315 pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
316 self.store
317 .delete(self.get_sst_data_path(object_id).as_str())
318 .await?;
319 self.meta_cache.remove(&object_id);
320 Ok(())
322 }
323
324 pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
325 self.meta_cache.remove(&object_id);
326 Ok(())
327 }
328
329 pub(crate) async fn put_sst_data(
330 &self,
331 object_id: HummockSstableObjectId,
332 data: Bytes,
333 ) -> HummockResult<()> {
334 let data_path = self.get_sst_data_path(object_id);
335 self.store
336 .upload(&data_path, data)
337 .await
338 .map_err(Into::into)
339 }
340
341 pub async fn prefetch_blocks(
342 &self,
343 sst: &Sstable,
344 block_index: usize,
345 end_index: usize,
346 policy: CachePolicy,
347 stats: &mut StoreLocalStatistic,
348 ) -> HummockResult<Box<dyn BlockStream>> {
349 let object_id = sst.id;
350 if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
351 let block = self.get(sst, block_index, policy, stats).await?;
352 return Ok(Box::new(PrefetchBlockStream::new(
353 VecDeque::from([block]),
354 block_index,
355 None,
356 )));
357 }
358 if let Some(entry) = self
359 .block_cache
360 .get(&SstableBlockIndex {
361 sst_id: object_id,
362 block_idx: block_index as _,
363 })
364 .await
365 .map_err(HummockError::foyer_error)?
366 {
367 stats.cache_data_block_total += 1;
368 if entry.source() == foyer::Source::Outer {
369 stats.cache_data_block_miss += 1;
370 }
371 let block = BlockHolder::from_hybrid_cache_entry(entry);
372 return Ok(Box::new(PrefetchBlockStream::new(
373 VecDeque::from([block]),
374 block_index,
375 None,
376 )));
377 }
378 let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
379 let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
380 let start_offset = sst.meta.block_metas[block_index].offset as usize;
381 let mut min_hit_index = end_index;
382 let mut hit_count = 0;
383 for idx in block_index..end_index {
384 if self.block_cache.contains(&SstableBlockIndex {
385 sst_id: object_id,
386 block_idx: idx as _,
387 }) {
388 if min_hit_index > idx && idx > block_index {
389 min_hit_index = idx;
390 }
391 hit_count += 1;
392 }
393 }
394
395 if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
396 {
397 end_index = min_hit_index;
398 }
399 stats.cache_data_prefetch_count += 1;
400 stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
401 let end_offset = start_offset
402 + sst.meta.block_metas[block_index..end_index]
403 .iter()
404 .map(|meta| meta.len as usize)
405 .sum::<usize>();
406 let data_path = self.get_sst_data_path(object_id);
407 let memory_usage = end_offset - start_offset;
408 let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
409 let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
410 let store = self.store.clone();
411 let join_handle = tokio::spawn(async move {
412 store
413 .read(&data_path, start_offset..end_offset)
414 .instrument_await(span)
415 .await
416 });
417 let buf = match join_handle.await {
418 Ok(Ok(data)) => data,
419 Ok(Err(e)) => {
420 tracing::error!(
421 "prefetch meet error when read {}..{} from sst-{} ({})",
422 start_offset,
423 end_offset,
424 object_id,
425 sst.meta.estimated_size,
426 );
427 return Err(e.into());
428 }
429 Err(_) => {
430 return Err(HummockError::other("cancel by other thread"));
431 }
432 };
433 let mut offset = 0;
434 let mut blocks = VecDeque::default();
435 for idx in block_index..end_index {
436 let end = offset + sst.meta.block_metas[idx].len as usize;
437 if end > buf.len() {
438 return Err(ObjectError::internal("read unexpected EOF").into());
439 }
440 let block = Block::decode_with_copy(
442 buf.slice(offset..end),
443 sst.meta.block_metas[idx].uncompressed_size as usize,
444 true,
445 )?;
446 let holder = if let CachePolicy::Fill(hint) = policy {
447 let hint = if idx == block_index { hint } else { Hint::Low };
448 let entry = self.block_cache.insert_with_properties(
449 SstableBlockIndex {
450 sst_id: object_id,
451 block_idx: idx as _,
452 },
453 Box::new(block),
454 HybridCacheProperties::default().with_hint(hint),
455 );
456 BlockHolder::from_hybrid_cache_entry(entry)
457 } else {
458 BlockHolder::from_owned_block(Box::new(block))
459 };
460
461 blocks.push_back(holder);
462 offset = end;
463 }
464 Ok(Box::new(PrefetchBlockStream::new(
465 blocks,
466 block_index,
467 Some(tracker),
468 )))
469 }
470
471 pub async fn get_block_response(
472 &self,
473 sst: &Sstable,
474 block_index: usize,
475 policy: CachePolicy,
476 ) -> HummockResult<BlockResponse> {
477 let object_id = sst.id;
478 let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
479 let store = self.store.clone();
480
481 let file_size = sst.meta.estimated_size;
482 let data_path = Arc::new(self.get_sst_data_path(object_id));
483
484 let disable_cache: fn() -> bool = || {
485 fail_point!("disable_block_cache", |_| true);
486 false
487 };
488
489 let policy = if disable_cache() {
490 CachePolicy::Disable
491 } else {
492 policy
493 };
494
495 let idx = SstableBlockIndex {
496 sst_id: object_id,
497 block_idx: block_index as _,
498 };
499
500 self.recent_filter
501 .extend([(object_id, usize::MAX), (object_id, block_index)]);
502
503 let fetch_block = async move {
505 let block_data = match store
506 .read(&data_path, range.clone())
507 .instrument_await("get_block_response".verbose())
508 .await
509 {
510 Ok(data) => data,
511 Err(e) => {
512 tracing::error!(
513 "get_block_response meet error when read {:?} from sst-{}, total length: {}",
514 range,
515 object_id,
516 file_size
517 );
518 return Err(HummockError::from(e));
519 }
520 };
521 let block = Box::new(Block::decode(block_data, uncompressed_capacity)?);
522 Ok(block)
523 };
524
525 match policy {
526 CachePolicy::Fill(hint) => {
527 let properties = HybridCacheProperties::default().with_hint(hint);
528 let fetch = self.block_cache.get_or_fetch(&idx, || {
529 fetch_block.map(|res| res.map(|block| (block, properties)))
530 });
531 Ok(BlockResponse::Fetch(fetch))
532 }
533 CachePolicy::NotFill => {
534 match self
535 .block_cache
536 .get(&idx)
537 .await
538 .map_err(HummockError::foyer_error)?
539 {
540 Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
541 entry,
542 ))),
543 _ => {
544 let block = fetch_block.await?;
545 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
546 }
547 }
548 }
549 CachePolicy::Disable => {
550 let block = fetch_block.await?;
551 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
552 }
553 }
554 }
555
556 pub async fn get(
557 &self,
558 sst: &Sstable,
559 block_index: usize,
560 policy: CachePolicy,
561 stats: &mut StoreLocalStatistic,
562 ) -> HummockResult<BlockHolder> {
563 let block_response = self.get_block_response(sst, block_index, policy).await?;
564 let block_holder = block_response.wait().await?;
565 stats.cache_data_block_total += 1;
566 if let BlockEntry::HybridCache(entry) = block_holder.entry()
567 && entry.source() == foyer::Source::Outer
568 {
569 stats.cache_data_block_miss += 1;
570 }
571 Ok(block_holder)
572 }
573
574 pub async fn get_vector_file_meta(
575 &self,
576 vector_file: &VectorFileInfo,
577 stats: &mut VectorStoreCacheStats,
578 ) -> HummockResult<VectorFileHolder> {
579 let store = self.store.clone();
580 let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
581 let meta_offset = vector_file.meta_offset;
582 let entry = self
583 .vector_meta_cache
584 .get_or_fetch(&vector_file.object_id.as_raw(), || async move {
585 let encoded_footer = store.read(&path, meta_offset..).await?;
586 let meta = VectorFileMeta::decode_footer(&encoded_footer)?;
587 Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(meta))
588 })
589 .await?;
590 stats.file_meta_total += 1;
591 if entry.source() == foyer::Source::Outer {
592 stats.file_meta_miss += 1;
593 }
594 VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
595 }
596
597 pub async fn get_vector_block(
598 &self,
599 vector_file: &VectorFileInfo,
600 block_idx: usize,
601 block_meta: &VectorBlockMeta,
602 stats: &mut VectorStoreCacheStats,
603 ) -> HummockResult<VectorBlockHolder> {
604 let store = self.store.clone();
605 let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
606 let start_offset = block_meta.offset;
607 let end_offset = start_offset + block_meta.block_size;
608 let entry = self
609 .vector_block_cache
610 .get_or_fetch(&(vector_file.object_id, block_idx), || async move {
611 let encoded_block = store.read(&path, start_offset..end_offset).await?;
612 let block = VectorBlock::decode(&encoded_block)?;
613 Ok::<_, anyhow::Error>(Box::new(block))
614 })
615 .await
616 .map_err(HummockError::foyer_error)?;
617
618 stats.file_block_total += 1;
619 if entry.source() == foyer::Source::Outer {
620 stats.file_block_miss += 1;
621 }
622 Ok(entry)
623 }
624
625 pub fn insert_vector_cache(
626 &self,
627 object_id: HummockVectorFileId,
628 meta: VectorFileMeta,
629 blocks: Vec<VectorBlock>,
630 ) {
631 self.vector_meta_cache
632 .insert(object_id.as_raw(), meta.into());
633 for (idx, block) in blocks.into_iter().enumerate() {
634 self.vector_block_cache
635 .insert((object_id, idx), Box::new(block));
636 }
637 }
638
639 pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
640 self.vector_meta_cache
641 .insert(object_id.as_raw(), graph.into());
642 }
643
644 pub async fn get_hnsw_graph(
645 &self,
646 graph_file: &HnswGraphFileInfo,
647 stats: &mut VectorStoreCacheStats,
648 ) -> HummockResult<HnswGraphFileHolder> {
649 let store = self.store.clone();
650 let graph_file_path =
651 self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
652 let entry = self
653 .vector_meta_cache
654 .get_or_fetch(&graph_file.object_id.as_raw(), || async move {
655 let encoded_graph = store.read(&graph_file_path, ..).await?;
656 let graph = PbHnswGraph::decode(encoded_graph.as_ref())?;
657 Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(graph))
658 })
659 .await
660 .map_err(HummockError::foyer_error)?;
661 stats.hnsw_graph_total += 1;
662 if entry.source() == foyer::Source::Outer {
663 stats.hnsw_graph_miss += 1;
664 }
665 HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
666 }
667
668 pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
669 self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
670 }
671
672 pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
673 let obj_prefix = self.store.get_object_prefix(
674 object_id.as_raw().inner(),
675 self.use_new_object_prefix_strategy,
676 );
677 risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
678 }
679
680 pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
681 risingwave_hummock_sdk::get_object_id_from_path(path)
682 }
683
684 pub fn store(&self) -> ObjectStoreRef {
685 self.store.clone()
686 }
687
688 #[cfg(any(test, feature = "test"))]
689 pub async fn clear_block_cache(&self) -> HummockResult<()> {
690 self.block_cache
691 .clear()
692 .await
693 .map_err(HummockError::foyer_error)
694 }
695
696 #[cfg(any(test, feature = "test"))]
697 pub async fn clear_meta_cache(&self) -> HummockResult<()> {
698 self.meta_cache
699 .clear()
700 .await
701 .map_err(HummockError::foyer_error)
702 }
703
704 pub async fn sstable_cached(
705 &self,
706 sst_obj_id: HummockSstableObjectId,
707 ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
708 self.meta_cache
709 .get(&sst_obj_id)
710 .await
711 .map_err(HummockError::foyer_error)
712 }
713
714 pub fn sstable(
716 &self,
717 sstable_info_ref: &SstableInfo,
718 stats: &mut StoreLocalStatistic,
719 ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
720 let object_id = sstable_info_ref.object_id;
721 let store = self.store.clone();
722 let meta_path = self.get_sst_data_path(object_id);
723 let stats_ptr = stats.remote_io_time.clone();
724 let range = sstable_info_ref.meta_offset as usize..;
725 let skip_bloom_filter_in_serde = self.skip_bloom_filter_in_serde;
726
727 let entry = self.meta_cache.get_or_fetch(&object_id, || async move {
728 let now = Instant::now();
729 let buf = store
730 .read(&meta_path, range)
731 .instrument_await("get_meta_response".verbose())
732 .await?;
733 let meta = SstableMeta::decode(&buf[..])?;
734
735 let sst = Sstable::new(object_id, meta, skip_bloom_filter_in_serde);
736 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
737 stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
738 Ok::<_, anyhow::Error>(Box::new(sst))
739 });
740
741 stats.cache_meta_block_total += 1;
742
743 async move {
744 entry
745 .instrument_await("fetch_meta".verbose())
746 .await
747 .map_err(HummockError::foyer_error)
748 }
749 }
750
751 pub async fn list_sst_object_metadata_from_object_store(
752 &self,
753 prefix: Option<String>,
754 start_after: Option<String>,
755 limit: Option<usize>,
756 ) -> HummockResult<ObjectMetadataIter> {
757 let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
758 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
759 let iter = raw_iter.filter(|r| match r {
760 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
761 Err(_) => future::ready(true),
762 });
763 Ok(Box::pin(iter))
764 }
765
766 pub fn create_sst_writer(
767 self: Arc<Self>,
768 object_id: impl Into<HummockSstableObjectId>,
769 options: SstableWriterOptions,
770 ) -> BatchUploadWriter {
771 BatchUploadWriter::new(object_id, self, options)
772 }
773
774 pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
775 let sst = Sstable::new(object_id, meta, self.skip_bloom_filter_in_serde);
776 self.meta_cache.insert(object_id, Box::new(sst));
777 }
778
779 pub fn insert_block_cache(
780 &self,
781 object_id: HummockSstableObjectId,
782 block_index: u64,
783 block: Box<Block>,
784 ) {
785 self.block_cache.insert(
786 SstableBlockIndex {
787 sst_id: object_id,
788 block_idx: block_index,
789 },
790 block,
791 );
792 }
793
794 pub fn get_prefetch_memory_usage(&self) -> usize {
795 self.prefetch_buffer_usage.load(Ordering::Acquire)
796 }
797
798 pub async fn get_stream_for_blocks(
799 &self,
800 object_id: HummockSstableObjectId,
801 metas: &[BlockMeta],
802 ) -> HummockResult<BlockDataStream> {
803 fail_point!("get_stream_err");
804 let data_path = self.get_sst_data_path(object_id);
805 let store = self.store();
806 let block_meta = &metas[0];
807 let start_pos = block_meta.offset as usize;
808 let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
809 let range = start_pos..end_pos;
810 let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
812
813 let reader = match ret {
814 Ok(Ok(reader)) => reader,
815 Ok(Err(e)) => return Err(HummockError::from(e)),
816 Err(e) => {
817 return Err(HummockError::other(format!(
818 "failed to get result, this read request may be canceled: {}",
819 e.as_report()
820 )));
821 }
822 };
823 Ok(BlockDataStream::new(reader, metas.to_vec()))
824 }
825
826 pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
827 &self.meta_cache
828 }
829
830 pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
831 &self.block_cache
832 }
833
834 pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
835 &self.recent_filter
836 }
837
838 pub async fn create_streaming_uploader(
839 &self,
840 path: &str,
841 ) -> ObjectResult<ObjectStreamingUploader> {
842 self.store.streaming_upload(path).await
843 }
844}
845
846pub type SstableStoreRef = Arc<SstableStore>;
847#[cfg(test)]
848mod tests {
849 use std::ops::Range;
850 use std::sync::Arc;
851
852 use risingwave_hummock_sdk::HummockObjectId;
853 use risingwave_hummock_sdk::sstable_info::SstableInfo;
854
855 use super::{SstableStoreRef, SstableWriterOptions};
856 use crate::hummock::iterator::HummockIterator;
857 use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
858 use crate::hummock::sstable::SstableIteratorReadOptions;
859 use crate::hummock::test_utils::{
860 default_builder_opt_for_test, gen_test_sstable_data, put_sst,
861 };
862 use crate::hummock::value::HummockValue;
863 use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
864 use crate::monitor::StoreLocalStatistic;
865
866 const SST_ID: u64 = 1;
867
868 fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
869 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
870 }
871
872 async fn validate_sst(
873 sstable_store: SstableStoreRef,
874 info: &SstableInfo,
875 mut meta: SstableMeta,
876 x_range: Range<usize>,
877 ) {
878 let mut stats = StoreLocalStatistic::default();
879 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
880 std::mem::take(&mut meta.bloom_filter);
881 assert_eq!(holder.meta, meta);
882 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
883 assert_eq!(holder.meta, meta);
884 let mut iter = SstableIterator::new(
885 holder,
886 sstable_store,
887 Arc::new(SstableIteratorReadOptions::default()),
888 info,
889 );
890 iter.rewind().await.unwrap();
891 for i in x_range {
892 let key = iter.key();
893 let value = iter.value();
894 assert_eq!(key, iterator_test_key_of(i).to_ref());
895 assert_eq!(value, get_hummock_value(i).as_slice());
896 iter.next().await.unwrap();
897 }
898 }
899
900 #[tokio::test]
901 async fn test_batch_upload() {
902 let sstable_store = mock_sstable_store().await;
903 let x_range = 0..100;
904 let (data, meta) = gen_test_sstable_data(
905 default_builder_opt_for_test(),
906 x_range
907 .clone()
908 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
909 )
910 .await;
911 let writer_opts = SstableWriterOptions {
912 capacity_hint: None,
913 tracker: None,
914 policy: CachePolicy::Disable,
915 };
916 let info = put_sst(
917 SST_ID,
918 data.clone(),
919 meta.clone(),
920 sstable_store.clone(),
921 writer_opts,
922 vec![0],
923 )
924 .await
925 .unwrap();
926
927 validate_sst(sstable_store, &info, meta, x_range).await;
928 }
929
930 #[tokio::test]
931 async fn test_streaming_upload() {
932 let sstable_store = mock_sstable_store().await;
934 let x_range = 0..100;
935 let (data, meta) = gen_test_sstable_data(
936 default_builder_opt_for_test(),
937 x_range
938 .clone()
939 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
940 )
941 .await;
942 let writer_opts = SstableWriterOptions {
943 capacity_hint: None,
944 tracker: None,
945 policy: CachePolicy::Disable,
946 };
947 let info = put_sst(
948 SST_ID,
949 data.clone(),
950 meta.clone(),
951 sstable_store.clone(),
952 writer_opts,
953 vec![0],
954 )
955 .await
956 .unwrap();
957
958 validate_sst(sstable_store, &info, meta, x_range).await;
959 }
960
961 #[tokio::test]
962 async fn test_basic() {
963 let sstable_store = mock_sstable_store().await;
964 let object_id = 123;
965 let data_path = sstable_store.get_sst_data_path(object_id);
966 assert_eq!(data_path, "test/123.data");
967 assert_eq!(
968 SstableStore::get_object_id_from_path(&data_path),
969 HummockObjectId::Sstable(object_id.into())
970 );
971 }
972}