1use std::clone::Clone;
16use std::collections::VecDeque;
17use std::ops::Deref;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use foyer::{
26 Cache, CacheBuilder, CacheEntry, EventListener, Hint, HybridCache, HybridCacheBuilder,
27 HybridCacheEntry, HybridCacheProperties,
28};
29use futures::{FutureExt, StreamExt, future};
30use prost::Message;
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
33use risingwave_hummock_sdk::{
34 HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
35 HummockVectorFileId, SST_OBJECT_SUFFIX,
36};
37use risingwave_hummock_trace::TracedCachePolicy;
38use risingwave_object_store::object::{
39 ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
40};
41use risingwave_pb::hummock::PbHnswGraph;
42use serde::{Deserialize, Serialize};
43use thiserror_ext::AsReport;
44use tokio::time::Instant;
45
46use super::{
47 BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
48 SstableWriterOptions,
49};
50use crate::hummock::block_stream::{
51 BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
52};
53use crate::hummock::none::NoneRecentFilter;
54use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
55use crate::hummock::vector::monitor::VectorStoreCacheStats;
56use crate::hummock::{BlockEntry, BlockHolder, HummockError, HummockResult, RecentFilterTrait};
57use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
58
59macro_rules! impl_vector_index_meta_file {
60 ($($type_name:ident),+) => {
61 pub enum HummockVectorIndexMetaFile {
62 $(
63 $type_name(Pin<Box<$type_name>>),
64 )+
65 }
66
67 $(
68 impl From<$type_name> for HummockVectorIndexMetaFile {
69 fn from(v: $type_name) -> Self {
70 Self::$type_name(Box::pin(v))
71 }
72 }
73
74 unsafe impl Send for VectorMetaFileHolder<$type_name> {}
75
76 impl VectorMetaFileHolder<$type_name> {
77 fn try_from_entry(
78 entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
79 object_id: HummockRawObjectId
80 ) -> HummockResult<Self> {
81 let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
82 return Err(HummockError::decode_error(format!(
83 "expect {} for object {}",
84 stringify!($type_name),
85 object_id
86 )));
87 };
88 let ptr = file_meta.as_ref().get_ref() as *const _;
89 Ok(VectorMetaFileHolder {
90 _cache_entry: entry,
91 ptr,
92 })
93 }
94 }
95 )+
96 };
97}
98
99impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
100
101pub struct VectorMetaFileHolder<T> {
102 _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
103 ptr: *const T,
104}
105
106impl<T> Deref for VectorMetaFileHolder<T> {
107 type Target = T;
108
109 fn deref(&self) -> &Self::Target {
110 unsafe { &*self.ptr }
112 }
113}
114
115pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
116
117pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
118
119pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
120pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
121
122#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
123pub struct SstableBlockIndex {
124 pub sst_id: HummockSstableObjectId,
125 pub block_idx: u64,
126}
127
128pub struct BlockCacheEventListener {
129 metrics: Arc<HummockStateStoreMetrics>,
130}
131
132impl BlockCacheEventListener {
133 pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
134 Self { metrics }
135 }
136}
137
138impl EventListener for BlockCacheEventListener {
139 type Key = SstableBlockIndex;
140 type Value = Box<Block>;
141
142 fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
143 where
144 Self::Key: foyer::Key,
145 Self::Value: foyer::Value,
146 {
147 self.metrics
148 .block_efficiency_histogram
149 .observe(value.efficiency());
150 }
151}
152
153#[derive(Clone, Copy, Eq, PartialEq)]
155pub enum CachePolicy {
156 Disable,
158 Fill(Hint),
160 NotFill,
162}
163
164impl Default for CachePolicy {
165 fn default() -> Self {
166 CachePolicy::Fill(Hint::Normal)
167 }
168}
169
170impl From<TracedCachePolicy> for CachePolicy {
171 fn from(policy: TracedCachePolicy) -> Self {
172 match policy {
173 TracedCachePolicy::Disable => Self::Disable,
174 TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
175 TracedCachePolicy::NotFill => Self::NotFill,
176 }
177 }
178}
179
180impl From<CachePolicy> for TracedCachePolicy {
181 fn from(policy: CachePolicy) -> Self {
182 match policy {
183 CachePolicy::Disable => Self::Disable,
184 CachePolicy::Fill(priority) => Self::Fill(priority.into()),
185 CachePolicy::NotFill => Self::NotFill,
186 }
187 }
188}
189
190pub struct SstableStoreConfig {
191 pub store: ObjectStoreRef,
192 pub path: String,
193
194 pub prefetch_buffer_capacity: usize,
195 pub max_prefetch_block_number: usize,
196 pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
197 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
198 pub use_new_object_prefix_strategy: bool,
199 pub skip_bloom_filter_in_serde: bool,
200
201 pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
202 pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
203
204 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
205 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
206}
207
208pub struct SstableStore {
209 path: String,
210 store: ObjectStoreRef,
211
212 meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
213 block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
214 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
215 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
216
217 recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
221 prefetch_buffer_usage: Arc<AtomicUsize>,
222 prefetch_buffer_capacity: usize,
223 max_prefetch_block_number: usize,
224 use_new_object_prefix_strategy: bool,
233
234 skip_bloom_filter_in_serde: bool,
238}
239
240impl SstableStore {
241 pub fn new(config: SstableStoreConfig) -> Self {
242 Self {
246 path: config.path,
247 store: config.store,
248
249 meta_cache: config.meta_cache,
250 block_cache: config.block_cache,
251 vector_meta_cache: config.vector_meta_cache,
252 vector_block_cache: config.vector_block_cache,
253
254 recent_filter: config.recent_filter,
255 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
256 prefetch_buffer_capacity: config.prefetch_buffer_capacity,
257 max_prefetch_block_number: config.max_prefetch_block_number,
258 use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
259 skip_bloom_filter_in_serde: config.skip_bloom_filter_in_serde,
260 }
261 }
262
263 #[expect(clippy::borrowed_box)]
266 pub async fn for_compactor(
267 store: ObjectStoreRef,
268 path: String,
269 block_cache_capacity: usize,
270 meta_cache_capacity: usize,
271 use_new_object_prefix_strategy: bool,
272 ) -> HummockResult<Self> {
273 let meta_cache = HybridCacheBuilder::new()
274 .memory(meta_cache_capacity)
275 .with_shards(1)
276 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
277 u64::BITS as usize / 8 + value.estimate_size()
278 })
279 .storage()
280 .build()
281 .await
282 .map_err(HummockError::foyer_error)?;
283
284 let block_cache = HybridCacheBuilder::new()
285 .memory(block_cache_capacity)
286 .with_shards(1)
287 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
288 u64::BITS as usize * 2 / 8 + value.raw().len()
290 })
291 .storage()
292 .build()
293 .await
294 .map_err(HummockError::foyer_error)?;
295
296 Ok(Self {
297 path,
298 store,
299
300 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
301 prefetch_buffer_capacity: block_cache_capacity,
302 max_prefetch_block_number: 16, recent_filter: Arc::new(NoneRecentFilter::default().into()),
304 use_new_object_prefix_strategy,
305 skip_bloom_filter_in_serde: false,
306
307 meta_cache,
308 block_cache,
309 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
310 vector_block_cache: CacheBuilder::new(1 << 10).build(),
311 })
312 }
313
314 pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
315 self.store
316 .delete(self.get_sst_data_path(object_id).as_str())
317 .await?;
318 self.meta_cache.remove(&object_id);
319 Ok(())
321 }
322
323 pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
324 self.meta_cache.remove(&object_id);
325 Ok(())
326 }
327
328 pub(crate) async fn put_sst_data(
329 &self,
330 object_id: HummockSstableObjectId,
331 data: Bytes,
332 ) -> HummockResult<()> {
333 let data_path = self.get_sst_data_path(object_id);
334 self.store
335 .upload(&data_path, data)
336 .await
337 .map_err(Into::into)
338 }
339
340 pub async fn prefetch_blocks(
341 &self,
342 sst: &Sstable,
343 block_index: usize,
344 end_index: usize,
345 policy: CachePolicy,
346 stats: &mut StoreLocalStatistic,
347 ) -> HummockResult<Box<dyn BlockStream>> {
348 let object_id = sst.id;
349 if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
350 let block = self.get(sst, block_index, policy, stats).await?;
351 return Ok(Box::new(PrefetchBlockStream::new(
352 VecDeque::from([block]),
353 block_index,
354 None,
355 )));
356 }
357 if let Some(entry) = self
358 .block_cache
359 .get(&SstableBlockIndex {
360 sst_id: object_id,
361 block_idx: block_index as _,
362 })
363 .await
364 .map_err(HummockError::foyer_error)?
365 {
366 stats.cache_data_block_total += 1;
367 if entry.source() == foyer::Source::Outer {
368 stats.cache_data_block_miss += 1;
369 }
370 let block = BlockHolder::from_hybrid_cache_entry(entry);
371 return Ok(Box::new(PrefetchBlockStream::new(
372 VecDeque::from([block]),
373 block_index,
374 None,
375 )));
376 }
377 let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
378 let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
379 let start_offset = sst.meta.block_metas[block_index].offset as usize;
380 let mut min_hit_index = end_index;
381 let mut hit_count = 0;
382 for idx in block_index..end_index {
383 if self.block_cache.contains(&SstableBlockIndex {
384 sst_id: object_id,
385 block_idx: idx as _,
386 }) {
387 if min_hit_index > idx && idx > block_index {
388 min_hit_index = idx;
389 }
390 hit_count += 1;
391 }
392 }
393
394 if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
395 {
396 end_index = min_hit_index;
397 }
398 stats.cache_data_prefetch_count += 1;
399 stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
400 let end_offset = start_offset
401 + sst.meta.block_metas[block_index..end_index]
402 .iter()
403 .map(|meta| meta.len as usize)
404 .sum::<usize>();
405 let data_path = self.get_sst_data_path(object_id);
406 let memory_usage = end_offset - start_offset;
407 let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
408 let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
409 let store = self.store.clone();
410 let join_handle = tokio::spawn(async move {
411 store
412 .read(&data_path, start_offset..end_offset)
413 .instrument_await(span)
414 .await
415 });
416 let buf = match join_handle.await {
417 Ok(Ok(data)) => data,
418 Ok(Err(e)) => {
419 tracing::error!(
420 "prefetch meet error when read {}..{} from sst-{} ({})",
421 start_offset,
422 end_offset,
423 object_id,
424 sst.meta.estimated_size,
425 );
426 return Err(e.into());
427 }
428 Err(_) => {
429 return Err(HummockError::other("cancel by other thread"));
430 }
431 };
432 let mut offset = 0;
433 let mut blocks = VecDeque::default();
434 for idx in block_index..end_index {
435 let end = offset + sst.meta.block_metas[idx].len as usize;
436 if end > buf.len() {
437 return Err(ObjectError::internal("read unexpected EOF").into());
438 }
439 let block = Block::decode_with_copy(
441 buf.slice(offset..end),
442 sst.meta.block_metas[idx].uncompressed_size as usize,
443 true,
444 )?;
445 let holder = if let CachePolicy::Fill(hint) = policy {
446 let hint = if idx == block_index { hint } else { Hint::Low };
447 let entry = self.block_cache.insert_with_properties(
448 SstableBlockIndex {
449 sst_id: object_id,
450 block_idx: idx as _,
451 },
452 Box::new(block),
453 HybridCacheProperties::default().with_hint(hint),
454 );
455 BlockHolder::from_hybrid_cache_entry(entry)
456 } else {
457 BlockHolder::from_owned_block(Box::new(block))
458 };
459
460 blocks.push_back(holder);
461 offset = end;
462 }
463 Ok(Box::new(PrefetchBlockStream::new(
464 blocks,
465 block_index,
466 Some(tracker),
467 )))
468 }
469
470 pub async fn get_block_response(
471 &self,
472 sst: &Sstable,
473 block_index: usize,
474 policy: CachePolicy,
475 ) -> HummockResult<BlockResponse> {
476 let object_id = sst.id;
477 let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
478 let store = self.store.clone();
479
480 let file_size = sst.meta.estimated_size;
481 let data_path = Arc::new(self.get_sst_data_path(object_id));
482
483 let disable_cache: fn() -> bool = || {
484 fail_point!("disable_block_cache", |_| true);
485 false
486 };
487
488 let policy = if disable_cache() {
489 CachePolicy::Disable
490 } else {
491 policy
492 };
493
494 let idx = SstableBlockIndex {
495 sst_id: object_id,
496 block_idx: block_index as _,
497 };
498
499 self.recent_filter
500 .extend([(object_id, usize::MAX), (object_id, block_index)]);
501
502 let fetch_block = async move {
504 let block_data = match store
505 .read(&data_path, range.clone())
506 .instrument_await("get_block_response".verbose())
507 .await
508 {
509 Ok(data) => data,
510 Err(e) => {
511 tracing::error!(
512 "get_block_response meet error when read {:?} from sst-{}, total length: {}",
513 range,
514 object_id,
515 file_size
516 );
517 return Err(HummockError::from(e));
518 }
519 };
520 let block = Box::new(Block::decode(block_data, uncompressed_capacity)?);
521 Ok(block)
522 };
523
524 match policy {
525 CachePolicy::Fill(hint) => {
526 let properties = HybridCacheProperties::default().with_hint(hint);
527 let fetch = self.block_cache.get_or_fetch(&idx, || {
528 fetch_block.map(|res| res.map(|block| (block, properties)))
529 });
530 Ok(BlockResponse::Fetch(fetch))
531 }
532 CachePolicy::NotFill => {
533 match self
534 .block_cache
535 .get(&idx)
536 .await
537 .map_err(HummockError::foyer_error)?
538 {
539 Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
540 entry,
541 ))),
542 _ => {
543 let block = fetch_block.await?;
544 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
545 }
546 }
547 }
548 CachePolicy::Disable => {
549 let block = fetch_block.await?;
550 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
551 }
552 }
553 }
554
555 pub async fn get(
556 &self,
557 sst: &Sstable,
558 block_index: usize,
559 policy: CachePolicy,
560 stats: &mut StoreLocalStatistic,
561 ) -> HummockResult<BlockHolder> {
562 let block_response = self.get_block_response(sst, block_index, policy).await?;
563 let block_holder = block_response.wait().await?;
564 stats.cache_data_block_total += 1;
565 if let BlockEntry::HybridCache(entry) = block_holder.entry()
566 && entry.source() == foyer::Source::Outer
567 {
568 stats.cache_data_block_miss += 1;
569 }
570 Ok(block_holder)
571 }
572
573 pub async fn get_vector_file_meta(
574 &self,
575 vector_file: &VectorFileInfo,
576 stats: &mut VectorStoreCacheStats,
577 ) -> HummockResult<VectorFileHolder> {
578 let store = self.store.clone();
579 let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
580 let meta_offset = vector_file.meta_offset;
581 let entry = self
582 .vector_meta_cache
583 .get_or_fetch(&vector_file.object_id.as_raw(), || async move {
584 let encoded_footer = store.read(&path, meta_offset..).await?;
585 let meta = VectorFileMeta::decode_footer(&encoded_footer)?;
586 Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(meta))
587 })
588 .await?;
589 stats.file_meta_total += 1;
590 if entry.source() == foyer::Source::Outer {
591 stats.file_meta_miss += 1;
592 }
593 VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
594 }
595
596 pub async fn get_vector_block(
597 &self,
598 vector_file: &VectorFileInfo,
599 block_idx: usize,
600 block_meta: &VectorBlockMeta,
601 stats: &mut VectorStoreCacheStats,
602 ) -> HummockResult<VectorBlockHolder> {
603 let store = self.store.clone();
604 let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
605 let start_offset = block_meta.offset;
606 let end_offset = start_offset + block_meta.block_size;
607 let entry = self
608 .vector_block_cache
609 .get_or_fetch(&(vector_file.object_id, block_idx), || async move {
610 let encoded_block = store.read(&path, start_offset..end_offset).await?;
611 let block = VectorBlock::decode(&encoded_block)?;
612 Ok::<_, anyhow::Error>(Box::new(block))
613 })
614 .await
615 .map_err(HummockError::foyer_error)?;
616
617 stats.file_block_total += 1;
618 if entry.source() == foyer::Source::Outer {
619 stats.file_block_miss += 1;
620 }
621 Ok(entry)
622 }
623
624 pub fn insert_vector_cache(
625 &self,
626 object_id: HummockVectorFileId,
627 meta: VectorFileMeta,
628 blocks: Vec<VectorBlock>,
629 ) {
630 self.vector_meta_cache
631 .insert(object_id.as_raw(), meta.into());
632 for (idx, block) in blocks.into_iter().enumerate() {
633 self.vector_block_cache
634 .insert((object_id, idx), Box::new(block));
635 }
636 }
637
638 pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
639 self.vector_meta_cache
640 .insert(object_id.as_raw(), graph.into());
641 }
642
643 pub async fn get_hnsw_graph(
644 &self,
645 graph_file: &HnswGraphFileInfo,
646 stats: &mut VectorStoreCacheStats,
647 ) -> HummockResult<HnswGraphFileHolder> {
648 let store = self.store.clone();
649 let graph_file_path =
650 self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
651 let entry = self
652 .vector_meta_cache
653 .get_or_fetch(&graph_file.object_id.as_raw(), || async move {
654 let encoded_graph = store.read(&graph_file_path, ..).await?;
655 let graph = PbHnswGraph::decode(encoded_graph.as_ref())?;
656 Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(graph))
657 })
658 .await
659 .map_err(HummockError::foyer_error)?;
660 stats.hnsw_graph_total += 1;
661 if entry.source() == foyer::Source::Outer {
662 stats.hnsw_graph_miss += 1;
663 }
664 HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
665 }
666
667 pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
668 self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
669 }
670
671 pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
672 let obj_prefix = self.store.get_object_prefix(
673 object_id.as_raw().inner(),
674 self.use_new_object_prefix_strategy,
675 );
676 risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
677 }
678
679 pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
680 risingwave_hummock_sdk::get_object_id_from_path(path)
681 }
682
683 pub fn store(&self) -> ObjectStoreRef {
684 self.store.clone()
685 }
686
687 #[cfg(any(test, feature = "test"))]
688 pub async fn clear_block_cache(&self) -> HummockResult<()> {
689 self.block_cache
690 .clear()
691 .await
692 .map_err(HummockError::foyer_error)
693 }
694
695 #[cfg(any(test, feature = "test"))]
696 pub async fn clear_meta_cache(&self) -> HummockResult<()> {
697 self.meta_cache
698 .clear()
699 .await
700 .map_err(HummockError::foyer_error)
701 }
702
703 pub async fn sstable_cached(
704 &self,
705 sst_obj_id: HummockSstableObjectId,
706 ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
707 self.meta_cache
708 .get(&sst_obj_id)
709 .await
710 .map_err(HummockError::foyer_error)
711 }
712
713 pub async fn sstable(
715 &self,
716 sstable_info_ref: &SstableInfo,
717 stats: &mut StoreLocalStatistic,
718 ) -> HummockResult<TableHolder> {
719 let object_id = sstable_info_ref.object_id;
720 let store = self.store.clone();
721 let meta_path = self.get_sst_data_path(object_id);
722 let stats_ptr = stats.remote_io_time.clone();
723 let range = sstable_info_ref.meta_offset as usize..;
724 let skip_bloom_filter_in_serde = self.skip_bloom_filter_in_serde;
725
726 let fetch = self.meta_cache.get_or_fetch(&object_id, || async move {
727 let now = Instant::now();
728 let buf = store
729 .read(&meta_path, range)
730 .instrument_await("get_meta_response".verbose())
731 .await?;
732 let meta = SstableMeta::decode(&buf[..])?;
733
734 let sst = Sstable::new(object_id, meta, skip_bloom_filter_in_serde);
735 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
736 stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
737 Ok::<_, anyhow::Error>(Box::new(sst))
738 });
739
740 stats.cache_meta_block_total += 1;
741 let entry = fetch
742 .instrument_await("fetch_meta".verbose())
743 .await
744 .map_err(HummockError::foyer_error);
745 if let Ok(ref entry) = entry
746 && entry.source() == foyer::Source::Outer
747 {
748 stats.cache_meta_block_miss += 1;
749 }
750 entry
751 }
752
753 pub async fn list_sst_object_metadata_from_object_store(
754 &self,
755 prefix: Option<String>,
756 start_after: Option<String>,
757 limit: Option<usize>,
758 ) -> HummockResult<ObjectMetadataIter> {
759 let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
760 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
761 let iter = raw_iter.filter(|r| match r {
762 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
763 Err(_) => future::ready(true),
764 });
765 Ok(Box::pin(iter))
766 }
767
768 pub fn create_sst_writer(
769 self: Arc<Self>,
770 object_id: impl Into<HummockSstableObjectId>,
771 options: SstableWriterOptions,
772 ) -> BatchUploadWriter {
773 BatchUploadWriter::new(object_id, self, options)
774 }
775
776 pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
777 let sst = Sstable::new(object_id, meta, self.skip_bloom_filter_in_serde);
778 self.meta_cache.insert(object_id, Box::new(sst));
779 }
780
781 pub fn insert_block_cache(
782 &self,
783 object_id: HummockSstableObjectId,
784 block_index: u64,
785 block: Box<Block>,
786 ) {
787 self.block_cache.insert(
788 SstableBlockIndex {
789 sst_id: object_id,
790 block_idx: block_index,
791 },
792 block,
793 );
794 }
795
796 pub fn get_prefetch_memory_usage(&self) -> usize {
797 self.prefetch_buffer_usage.load(Ordering::Acquire)
798 }
799
800 pub async fn get_stream_for_blocks(
801 &self,
802 object_id: HummockSstableObjectId,
803 metas: &[BlockMeta],
804 ) -> HummockResult<BlockDataStream> {
805 fail_point!("get_stream_err");
806 let data_path = self.get_sst_data_path(object_id);
807 let store = self.store();
808 let block_meta = &metas[0];
809 let start_pos = block_meta.offset as usize;
810 let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
811 let range = start_pos..end_pos;
812 let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
814
815 let reader = match ret {
816 Ok(Ok(reader)) => reader,
817 Ok(Err(e)) => return Err(HummockError::from(e)),
818 Err(e) => {
819 return Err(HummockError::other(format!(
820 "failed to get result, this read request may be canceled: {}",
821 e.as_report()
822 )));
823 }
824 };
825 Ok(BlockDataStream::new(reader, metas.to_vec()))
826 }
827
828 pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
829 &self.meta_cache
830 }
831
832 pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
833 &self.block_cache
834 }
835
836 pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
837 &self.recent_filter
838 }
839
840 pub async fn create_streaming_uploader(
841 &self,
842 path: &str,
843 ) -> ObjectResult<ObjectStreamingUploader> {
844 self.store.streaming_upload(path).await
845 }
846}
847
848pub type SstableStoreRef = Arc<SstableStore>;
849#[cfg(test)]
850mod tests {
851 use std::ops::Range;
852 use std::sync::Arc;
853
854 use risingwave_hummock_sdk::HummockObjectId;
855 use risingwave_hummock_sdk::sstable_info::SstableInfo;
856
857 use super::{SstableStoreRef, SstableWriterOptions};
858 use crate::hummock::iterator::HummockIterator;
859 use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
860 use crate::hummock::sstable::SstableIteratorReadOptions;
861 use crate::hummock::test_utils::{
862 default_builder_opt_for_test, gen_test_sstable_data, put_sst,
863 };
864 use crate::hummock::value::HummockValue;
865 use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
866 use crate::monitor::StoreLocalStatistic;
867
868 const SST_ID: u64 = 1;
869
870 fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
871 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
872 }
873
874 async fn validate_sst(
875 sstable_store: SstableStoreRef,
876 info: &SstableInfo,
877 mut meta: SstableMeta,
878 x_range: Range<usize>,
879 ) {
880 let mut stats = StoreLocalStatistic::default();
881 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
882 std::mem::take(&mut meta.bloom_filter);
883 assert_eq!(holder.meta, meta);
884 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
885 assert_eq!(holder.meta, meta);
886 let mut iter = SstableIterator::new(
887 holder,
888 sstable_store,
889 Arc::new(SstableIteratorReadOptions::default()),
890 info,
891 );
892 iter.rewind().await.unwrap();
893 for i in x_range {
894 let key = iter.key();
895 let value = iter.value();
896 assert_eq!(key, iterator_test_key_of(i).to_ref());
897 assert_eq!(value, get_hummock_value(i).as_slice());
898 iter.next().await.unwrap();
899 }
900 }
901
902 #[tokio::test]
903 async fn test_batch_upload() {
904 let sstable_store = mock_sstable_store().await;
905 let x_range = 0..100;
906 let (data, meta) = gen_test_sstable_data(
907 default_builder_opt_for_test(),
908 x_range
909 .clone()
910 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
911 )
912 .await;
913 let writer_opts = SstableWriterOptions {
914 capacity_hint: None,
915 tracker: None,
916 policy: CachePolicy::Disable,
917 };
918 let info = put_sst(
919 SST_ID,
920 data.clone(),
921 meta.clone(),
922 sstable_store.clone(),
923 writer_opts,
924 vec![0],
925 )
926 .await
927 .unwrap();
928
929 validate_sst(sstable_store, &info, meta, x_range).await;
930 }
931
932 #[tokio::test]
933 async fn test_streaming_upload() {
934 let sstable_store = mock_sstable_store().await;
936 let x_range = 0..100;
937 let (data, meta) = gen_test_sstable_data(
938 default_builder_opt_for_test(),
939 x_range
940 .clone()
941 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
942 )
943 .await;
944 let writer_opts = SstableWriterOptions {
945 capacity_hint: None,
946 tracker: None,
947 policy: CachePolicy::Disable,
948 };
949 let info = put_sst(
950 SST_ID,
951 data.clone(),
952 meta.clone(),
953 sstable_store.clone(),
954 writer_opts,
955 vec![0],
956 )
957 .await
958 .unwrap();
959
960 validate_sst(sstable_store, &info, meta, x_range).await;
961 }
962
963 #[tokio::test]
964 async fn test_basic() {
965 let sstable_store = mock_sstable_store().await;
966 let object_id = 123;
967 let data_path = sstable_store.get_sst_data_path(object_id);
968 assert_eq!(data_path, "test/123.data");
969 assert_eq!(
970 SstableStore::get_object_id_from_path(&data_path),
971 HummockObjectId::Sstable(object_id.into())
972 );
973 }
974}