1use std::clone::Clone;
16use std::collections::VecDeque;
17use std::future::Future;
18use std::ops::Deref;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22
23use await_tree::{InstrumentAwait, SpanExt};
24use bytes::Bytes;
25use fail::fail_point;
26use foyer::{
27 Cache, CacheBuilder, CacheEntry, EventListener, FetchState, Hint, HybridCache,
28 HybridCacheBuilder, HybridCacheEntry, HybridCacheProperties,
29};
30use futures::{StreamExt, future};
31use prost::Message;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
34use risingwave_hummock_sdk::{
35 HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
36 HummockVectorFileId, SST_OBJECT_SUFFIX,
37};
38use risingwave_hummock_trace::TracedCachePolicy;
39use risingwave_object_store::object::{
40 ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
41};
42use risingwave_pb::hummock::PbHnswGraph;
43use serde::{Deserialize, Serialize};
44use thiserror_ext::AsReport;
45use tokio::time::Instant;
46
47use super::{
48 BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
49 SstableWriterOptions,
50};
51use crate::hummock::block_stream::{
52 BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
53};
54use crate::hummock::none::NoneRecentFilter;
55use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
56use crate::hummock::vector::monitor::VectorStoreCacheStats;
57use crate::hummock::{BlockHolder, HummockError, HummockResult, RecentFilterTrait};
58use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
59
60macro_rules! impl_vector_index_meta_file {
61 ($($type_name:ident),+) => {
62 pub enum HummockVectorIndexMetaFile {
63 $(
64 $type_name(Pin<Box<$type_name>>),
65 )+
66 }
67
68 $(
69 impl From<$type_name> for HummockVectorIndexMetaFile {
70 fn from(v: $type_name) -> Self {
71 Self::$type_name(Box::pin(v))
72 }
73 }
74
75 unsafe impl Send for VectorMetaFileHolder<$type_name> {}
76
77 impl VectorMetaFileHolder<$type_name> {
78 fn try_from_entry(
79 entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
80 object_id: HummockRawObjectId
81 ) -> HummockResult<Self> {
82 let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
83 return Err(HummockError::decode_error(format!(
84 "expect {} for object {}",
85 stringify!($type_name),
86 object_id
87 )));
88 };
89 let ptr = file_meta.as_ref().get_ref() as *const _;
90 Ok(VectorMetaFileHolder {
91 _cache_entry: entry,
92 ptr,
93 })
94 }
95 }
96 )+
97 };
98}
99
100impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
101
102pub struct VectorMetaFileHolder<T> {
103 _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
104 ptr: *const T,
105}
106
107impl<T> Deref for VectorMetaFileHolder<T> {
108 type Target = T;
109
110 fn deref(&self) -> &Self::Target {
111 unsafe { &*self.ptr }
113 }
114}
115
116pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
117
118pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
119
120pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
121pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
122
123#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
124pub struct SstableBlockIndex {
125 pub sst_id: HummockSstableObjectId,
126 pub block_idx: u64,
127}
128
129pub struct BlockCacheEventListener {
130 metrics: Arc<HummockStateStoreMetrics>,
131}
132
133impl BlockCacheEventListener {
134 pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
135 Self { metrics }
136 }
137}
138
139impl EventListener for BlockCacheEventListener {
140 type Key = SstableBlockIndex;
141 type Value = Box<Block>;
142
143 fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
144 where
145 Self::Key: foyer::Key,
146 Self::Value: foyer::Value,
147 {
148 self.metrics
149 .block_efficiency_histogram
150 .observe(value.efficiency());
151 }
152}
153
154#[derive(Clone, Copy, Eq, PartialEq)]
156pub enum CachePolicy {
157 Disable,
159 Fill(Hint),
161 NotFill,
163}
164
165impl Default for CachePolicy {
166 fn default() -> Self {
167 CachePolicy::Fill(Hint::Normal)
168 }
169}
170
171impl From<TracedCachePolicy> for CachePolicy {
172 fn from(policy: TracedCachePolicy) -> Self {
173 match policy {
174 TracedCachePolicy::Disable => Self::Disable,
175 TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
176 TracedCachePolicy::NotFill => Self::NotFill,
177 }
178 }
179}
180
181impl From<CachePolicy> for TracedCachePolicy {
182 fn from(policy: CachePolicy) -> Self {
183 match policy {
184 CachePolicy::Disable => Self::Disable,
185 CachePolicy::Fill(priority) => Self::Fill(priority.into()),
186 CachePolicy::NotFill => Self::NotFill,
187 }
188 }
189}
190
191pub struct SstableStoreConfig {
192 pub store: ObjectStoreRef,
193 pub path: String,
194
195 pub prefetch_buffer_capacity: usize,
196 pub max_prefetch_block_number: usize,
197 pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
198 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
199 pub use_new_object_prefix_strategy: bool,
200
201 pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
202 pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
203
204 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
205 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
206}
207
208pub struct SstableStore {
209 path: String,
210 store: ObjectStoreRef,
211
212 meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
213 block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
214 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
215 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
216
217 recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
221 prefetch_buffer_usage: Arc<AtomicUsize>,
222 prefetch_buffer_capacity: usize,
223 max_prefetch_block_number: usize,
224 use_new_object_prefix_strategy: bool,
233}
234
235impl SstableStore {
236 pub fn new(config: SstableStoreConfig) -> Self {
237 Self {
241 path: config.path,
242 store: config.store,
243
244 meta_cache: config.meta_cache,
245 block_cache: config.block_cache,
246 vector_meta_cache: config.vector_meta_cache,
247 vector_block_cache: config.vector_block_cache,
248
249 recent_filter: config.recent_filter,
250 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
251 prefetch_buffer_capacity: config.prefetch_buffer_capacity,
252 max_prefetch_block_number: config.max_prefetch_block_number,
253 use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
254 }
255 }
256
257 #[expect(clippy::borrowed_box)]
260 pub async fn for_compactor(
261 store: ObjectStoreRef,
262 path: String,
263 block_cache_capacity: usize,
264 meta_cache_capacity: usize,
265 use_new_object_prefix_strategy: bool,
266 ) -> HummockResult<Self> {
267 let meta_cache = HybridCacheBuilder::new()
268 .memory(meta_cache_capacity)
269 .with_shards(1)
270 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
271 u64::BITS as usize / 8 + value.estimate_size()
272 })
273 .storage()
274 .build()
275 .await
276 .map_err(HummockError::foyer_error)?;
277
278 let block_cache = HybridCacheBuilder::new()
279 .memory(block_cache_capacity)
280 .with_shards(1)
281 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
282 u64::BITS as usize * 2 / 8 + value.raw().len()
284 })
285 .storage()
286 .build()
287 .await
288 .map_err(HummockError::foyer_error)?;
289
290 Ok(Self {
291 path,
292 store,
293
294 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
295 prefetch_buffer_capacity: block_cache_capacity,
296 max_prefetch_block_number: 16, recent_filter: Arc::new(NoneRecentFilter::default().into()),
298 use_new_object_prefix_strategy,
299
300 meta_cache,
301 block_cache,
302 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
303 vector_block_cache: CacheBuilder::new(1 << 10).build(),
304 })
305 }
306
307 pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
308 self.store
309 .delete(self.get_sst_data_path(object_id).as_str())
310 .await?;
311 self.meta_cache.remove(&object_id);
312 Ok(())
314 }
315
316 pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
317 self.meta_cache.remove(&object_id);
318 Ok(())
319 }
320
321 pub(crate) async fn put_sst_data(
322 &self,
323 object_id: HummockSstableObjectId,
324 data: Bytes,
325 ) -> HummockResult<()> {
326 let data_path = self.get_sst_data_path(object_id);
327 self.store
328 .upload(&data_path, data)
329 .await
330 .map_err(Into::into)
331 }
332
333 pub async fn prefetch_blocks(
334 &self,
335 sst: &Sstable,
336 block_index: usize,
337 end_index: usize,
338 policy: CachePolicy,
339 stats: &mut StoreLocalStatistic,
340 ) -> HummockResult<Box<dyn BlockStream>> {
341 let object_id = sst.id;
342 if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
343 let block = self.get(sst, block_index, policy, stats).await?;
344 return Ok(Box::new(PrefetchBlockStream::new(
345 VecDeque::from([block]),
346 block_index,
347 None,
348 )));
349 }
350 stats.cache_data_block_total += 1;
351 if let Some(entry) = self
352 .block_cache
353 .get(&SstableBlockIndex {
354 sst_id: object_id,
355 block_idx: block_index as _,
356 })
357 .await
358 .map_err(HummockError::foyer_error)?
359 {
360 let block = BlockHolder::from_hybrid_cache_entry(entry);
361 return Ok(Box::new(PrefetchBlockStream::new(
362 VecDeque::from([block]),
363 block_index,
364 None,
365 )));
366 }
367 let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
368 let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
369 let start_offset = sst.meta.block_metas[block_index].offset as usize;
370 let mut min_hit_index = end_index;
371 let mut hit_count = 0;
372 for idx in block_index..end_index {
373 if self.block_cache.contains(&SstableBlockIndex {
374 sst_id: object_id,
375 block_idx: idx as _,
376 }) {
377 if min_hit_index > idx && idx > block_index {
378 min_hit_index = idx;
379 }
380 hit_count += 1;
381 }
382 }
383
384 if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
385 {
386 end_index = min_hit_index;
387 }
388 stats.cache_data_prefetch_count += 1;
389 stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
390 let end_offset = start_offset
391 + sst.meta.block_metas[block_index..end_index]
392 .iter()
393 .map(|meta| meta.len as usize)
394 .sum::<usize>();
395 let data_path = self.get_sst_data_path(object_id);
396 let memory_usage = end_offset - start_offset;
397 let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
398 let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
399 let store = self.store.clone();
400 let join_handle = tokio::spawn(async move {
401 store
402 .read(&data_path, start_offset..end_offset)
403 .instrument_await(span)
404 .await
405 });
406 let buf = match join_handle.await {
407 Ok(Ok(data)) => data,
408 Ok(Err(e)) => {
409 tracing::error!(
410 "prefetch meet error when read {}..{} from sst-{} ({})",
411 start_offset,
412 end_offset,
413 object_id,
414 sst.meta.estimated_size,
415 );
416 return Err(e.into());
417 }
418 Err(_) => {
419 return Err(HummockError::other("cancel by other thread"));
420 }
421 };
422 let mut offset = 0;
423 let mut blocks = VecDeque::default();
424 for idx in block_index..end_index {
425 let end = offset + sst.meta.block_metas[idx].len as usize;
426 if end > buf.len() {
427 return Err(ObjectError::internal("read unexpected EOF").into());
428 }
429 let block = Block::decode_with_copy(
431 buf.slice(offset..end),
432 sst.meta.block_metas[idx].uncompressed_size as usize,
433 true,
434 )?;
435 let holder = if let CachePolicy::Fill(hint) = policy {
436 let hint = if idx == block_index { hint } else { Hint::Low };
437 let entry = self.block_cache.insert_with_properties(
438 SstableBlockIndex {
439 sst_id: object_id,
440 block_idx: idx as _,
441 },
442 Box::new(block),
443 HybridCacheProperties::default().with_hint(hint),
444 );
445 BlockHolder::from_hybrid_cache_entry(entry)
446 } else {
447 BlockHolder::from_owned_block(Box::new(block))
448 };
449
450 blocks.push_back(holder);
451 offset = end;
452 }
453 Ok(Box::new(PrefetchBlockStream::new(
454 blocks,
455 block_index,
456 Some(tracker),
457 )))
458 }
459
460 pub async fn get_block_response(
461 &self,
462 sst: &Sstable,
463 block_index: usize,
464 policy: CachePolicy,
465 stats: &mut StoreLocalStatistic,
466 ) -> HummockResult<BlockResponse> {
467 let object_id = sst.id;
468 let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
469 let store = self.store.clone();
470
471 stats.cache_data_block_total += 1;
472 let file_size = sst.meta.estimated_size;
473 let data_path = self.get_sst_data_path(object_id);
474
475 let disable_cache: fn() -> bool = || {
476 fail_point!("disable_block_cache", |_| true);
477 false
478 };
479
480 let policy = if disable_cache() {
481 CachePolicy::Disable
482 } else {
483 policy
484 };
485
486 let idx = SstableBlockIndex {
487 sst_id: object_id,
488 block_idx: block_index as _,
489 };
490
491 let fetch_block = move || {
493 let range = range.clone();
494
495 async move {
496 let block_data = match store
497 .read(&data_path, range.clone())
498 .instrument_await("get_block_response".verbose())
499 .await
500 {
501 Ok(data) => data,
502 Err(e) => {
503 tracing::error!(
504 "get_block_response meet error when read {:?} from sst-{}, total length: {}",
505 range,
506 object_id,
507 file_size
508 );
509 return Err(foyer::Error::other(HummockError::from(e)));
510 }
511 };
512 let block = Box::new(
513 Block::decode(block_data, uncompressed_capacity)
514 .map_err(foyer::Error::other)?,
515 );
516 Ok(block)
517 }
518 };
519
520 self.recent_filter
521 .extend([(object_id, usize::MAX), (object_id, block_index)]);
522
523 match policy {
524 CachePolicy::Fill(hint) => {
525 let entry = self.block_cache.fetch_with_properties(
526 idx,
527 HybridCacheProperties::default().with_hint(hint),
528 fetch_block,
529 );
530 if matches!(entry.state(), FetchState::Miss) {
531 stats.cache_data_block_miss += 1;
532 }
533 Ok(BlockResponse::Entry(entry))
534 }
535 CachePolicy::NotFill => {
536 match self
537 .block_cache
538 .get(&idx)
539 .await
540 .map_err(HummockError::foyer_error)?
541 {
542 Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
543 entry,
544 ))),
545 _ => {
546 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
547 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
548 }
549 }
550 }
551 CachePolicy::Disable => {
552 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
553 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
554 }
555 }
556 }
557
558 pub async fn get(
559 &self,
560 sst: &Sstable,
561 block_index: usize,
562 policy: CachePolicy,
563 stats: &mut StoreLocalStatistic,
564 ) -> HummockResult<BlockHolder> {
565 match self
566 .get_block_response(sst, block_index, policy, stats)
567 .await
568 {
569 Ok(block_response) => block_response.wait().await,
570 Err(err) => Err(err),
571 }
572 }
573
574 pub async fn get_vector_file_meta(
575 &self,
576 vector_file: &VectorFileInfo,
577 stats: &mut VectorStoreCacheStats,
578 ) -> HummockResult<VectorFileHolder> {
579 let entry = self
580 .vector_meta_cache
581 .fetch(vector_file.object_id.as_raw(), || {
582 let store = self.store.clone();
583 let path =
584 self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
585 let meta_offset = vector_file.meta_offset;
586 async move {
587 let encoded_footer = store
588 .read(&path, meta_offset..)
589 .await
590 .map_err(foyer::Error::other)?;
591 let meta = VectorFileMeta::decode_footer(&encoded_footer)
592 .map_err(foyer::Error::other)?;
593 Ok::<_, foyer::Error>(meta.into())
594 }
595 });
596 if let FetchState::Miss = entry.state() {
597 stats.file_meta_miss += 1;
598 }
599 stats.file_meta_total += 1;
600
601 let entry = entry.await?;
602 VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
603 }
604
605 pub async fn get_vector_block(
606 &self,
607 vector_file: &VectorFileInfo,
608 block_idx: usize,
609 block_meta: &VectorBlockMeta,
610 stats: &mut VectorStoreCacheStats,
611 ) -> HummockResult<VectorBlockHolder> {
612 let entry = self
613 .vector_block_cache
614 .fetch((vector_file.object_id, block_idx), || {
615 let store = self.store.clone();
616 let path =
617 self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
618 let start_offset = block_meta.offset;
619 let end_offset = start_offset + block_meta.block_size;
620 async move {
621 let encoded_block = store
622 .read(&path, start_offset..end_offset)
623 .await
624 .map_err(foyer::Error::other)?;
625 let block = VectorBlock::decode(&encoded_block).map_err(foyer::Error::other)?;
626 Ok(Box::new(block))
627 }
628 });
629 if let FetchState::Miss = entry.state() {
630 stats.file_block_miss += 1;
631 }
632 stats.file_block_total += 1;
633
634 entry.await.map_err(HummockError::foyer_error)
635 }
636
637 pub fn insert_vector_cache(
638 &self,
639 object_id: HummockVectorFileId,
640 meta: VectorFileMeta,
641 blocks: Vec<VectorBlock>,
642 ) {
643 self.vector_meta_cache
644 .insert(object_id.as_raw(), meta.into());
645 for (idx, block) in blocks.into_iter().enumerate() {
646 self.vector_block_cache
647 .insert((object_id, idx), Box::new(block));
648 }
649 }
650
651 pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
652 self.vector_meta_cache
653 .insert(object_id.as_raw(), graph.into());
654 }
655
656 pub async fn get_hnsw_graph(
657 &self,
658 graph_file: &HnswGraphFileInfo,
659 stats: &mut VectorStoreCacheStats,
660 ) -> HummockResult<HnswGraphFileHolder> {
661 let entry = self
662 .vector_meta_cache
663 .fetch(graph_file.object_id.as_raw(), || {
664 let store = self.store.clone();
665 let graph_file_path =
666 self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
667 async move {
668 let encoded_graph = store
669 .read(&graph_file_path, ..)
670 .await
671 .map_err(foyer::Error::other)?;
672 let graph =
673 PbHnswGraph::decode(encoded_graph.as_ref()).map_err(foyer::Error::other)?;
674 Ok::<_, foyer::Error>(graph.into())
675 }
676 });
677 if let FetchState::Miss = entry.state() {
678 stats.hnsw_graph_miss += 1;
679 }
680 stats.hnsw_graph_total += 1;
681
682 let entry = entry.await?;
683 HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
684 }
685
686 pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
687 self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
688 }
689
690 pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
691 let obj_prefix = self.store.get_object_prefix(
692 object_id.as_raw().inner(),
693 self.use_new_object_prefix_strategy,
694 );
695 risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
696 }
697
698 pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
699 risingwave_hummock_sdk::get_object_id_from_path(path)
700 }
701
702 pub fn store(&self) -> ObjectStoreRef {
703 self.store.clone()
704 }
705
706 #[cfg(any(test, feature = "test"))]
707 pub async fn clear_block_cache(&self) -> HummockResult<()> {
708 self.block_cache
709 .clear()
710 .await
711 .map_err(HummockError::foyer_error)
712 }
713
714 #[cfg(any(test, feature = "test"))]
715 pub async fn clear_meta_cache(&self) -> HummockResult<()> {
716 self.meta_cache
717 .clear()
718 .await
719 .map_err(HummockError::foyer_error)
720 }
721
722 pub async fn sstable_cached(
723 &self,
724 sst_obj_id: HummockSstableObjectId,
725 ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
726 self.meta_cache
727 .get(&sst_obj_id)
728 .await
729 .map_err(HummockError::foyer_error)
730 }
731
732 pub fn sstable(
734 &self,
735 sstable_info_ref: &SstableInfo,
736 stats: &mut StoreLocalStatistic,
737 ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
738 let object_id = sstable_info_ref.object_id;
739
740 let entry = self.meta_cache.fetch(object_id, || {
741 let store = self.store.clone();
742 let meta_path = self.get_sst_data_path(object_id);
743 let stats_ptr = stats.remote_io_time.clone();
744 let range = sstable_info_ref.meta_offset as usize..;
745 async move {
746 let now = Instant::now();
747 let buf = store
748 .read(&meta_path, range)
749 .await
750 .map_err(foyer::Error::other)?;
751 let meta = SstableMeta::decode(&buf[..]).map_err(foyer::Error::other)?;
752
753 let sst = Sstable::new(object_id, meta);
754 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
755 stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
756 Ok(Box::new(sst))
757 }
758 });
759
760 if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
761 stats.cache_meta_block_miss += 1;
762 }
763
764 stats.cache_meta_block_total += 1;
765
766 async move { entry.await.map_err(HummockError::foyer_error) }
767 }
768
769 pub async fn list_sst_object_metadata_from_object_store(
770 &self,
771 prefix: Option<String>,
772 start_after: Option<String>,
773 limit: Option<usize>,
774 ) -> HummockResult<ObjectMetadataIter> {
775 let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
776 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
777 let iter = raw_iter.filter(|r| match r {
778 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
779 Err(_) => future::ready(true),
780 });
781 Ok(Box::pin(iter))
782 }
783
784 pub fn create_sst_writer(
785 self: Arc<Self>,
786 object_id: impl Into<HummockSstableObjectId>,
787 options: SstableWriterOptions,
788 ) -> BatchUploadWriter {
789 BatchUploadWriter::new(object_id, self, options)
790 }
791
792 pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
793 let sst = Sstable::new(object_id, meta);
794 self.meta_cache.insert(object_id, Box::new(sst));
795 }
796
797 pub fn insert_block_cache(
798 &self,
799 object_id: HummockSstableObjectId,
800 block_index: u64,
801 block: Box<Block>,
802 ) {
803 self.block_cache.insert(
804 SstableBlockIndex {
805 sst_id: object_id,
806 block_idx: block_index,
807 },
808 block,
809 );
810 }
811
812 pub fn get_prefetch_memory_usage(&self) -> usize {
813 self.prefetch_buffer_usage.load(Ordering::Acquire)
814 }
815
816 pub async fn get_stream_for_blocks(
817 &self,
818 object_id: HummockSstableObjectId,
819 metas: &[BlockMeta],
820 ) -> HummockResult<BlockDataStream> {
821 fail_point!("get_stream_err");
822 let data_path = self.get_sst_data_path(object_id);
823 let store = self.store();
824 let block_meta = &metas[0];
825 let start_pos = block_meta.offset as usize;
826 let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
827 let range = start_pos..end_pos;
828 let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
830
831 let reader = match ret {
832 Ok(Ok(reader)) => reader,
833 Ok(Err(e)) => return Err(HummockError::from(e)),
834 Err(e) => {
835 return Err(HummockError::other(format!(
836 "failed to get result, this read request may be canceled: {}",
837 e.as_report()
838 )));
839 }
840 };
841 Ok(BlockDataStream::new(reader, metas.to_vec()))
842 }
843
844 pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
845 &self.meta_cache
846 }
847
848 pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
849 &self.block_cache
850 }
851
852 pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
853 &self.recent_filter
854 }
855
856 pub async fn create_streaming_uploader(
857 &self,
858 path: &str,
859 ) -> ObjectResult<ObjectStreamingUploader> {
860 self.store.streaming_upload(path).await
861 }
862}
863
864pub type SstableStoreRef = Arc<SstableStore>;
865#[cfg(test)]
866mod tests {
867 use std::ops::Range;
868 use std::sync::Arc;
869
870 use risingwave_hummock_sdk::HummockObjectId;
871 use risingwave_hummock_sdk::sstable_info::SstableInfo;
872
873 use super::{SstableStoreRef, SstableWriterOptions};
874 use crate::hummock::iterator::HummockIterator;
875 use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
876 use crate::hummock::sstable::SstableIteratorReadOptions;
877 use crate::hummock::test_utils::{
878 default_builder_opt_for_test, gen_test_sstable_data, put_sst,
879 };
880 use crate::hummock::value::HummockValue;
881 use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
882 use crate::monitor::StoreLocalStatistic;
883
884 const SST_ID: u64 = 1;
885
886 fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
887 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
888 }
889
890 async fn validate_sst(
891 sstable_store: SstableStoreRef,
892 info: &SstableInfo,
893 mut meta: SstableMeta,
894 x_range: Range<usize>,
895 ) {
896 let mut stats = StoreLocalStatistic::default();
897 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
898 std::mem::take(&mut meta.bloom_filter);
899 assert_eq!(holder.meta, meta);
900 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
901 assert_eq!(holder.meta, meta);
902 let mut iter = SstableIterator::new(
903 holder,
904 sstable_store,
905 Arc::new(SstableIteratorReadOptions::default()),
906 info,
907 );
908 iter.rewind().await.unwrap();
909 for i in x_range {
910 let key = iter.key();
911 let value = iter.value();
912 assert_eq!(key, iterator_test_key_of(i).to_ref());
913 assert_eq!(value, get_hummock_value(i).as_slice());
914 iter.next().await.unwrap();
915 }
916 }
917
918 #[tokio::test]
919 async fn test_batch_upload() {
920 let sstable_store = mock_sstable_store().await;
921 let x_range = 0..100;
922 let (data, meta) = gen_test_sstable_data(
923 default_builder_opt_for_test(),
924 x_range
925 .clone()
926 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
927 )
928 .await;
929 let writer_opts = SstableWriterOptions {
930 capacity_hint: None,
931 tracker: None,
932 policy: CachePolicy::Disable,
933 };
934 let info = put_sst(
935 SST_ID,
936 data.clone(),
937 meta.clone(),
938 sstable_store.clone(),
939 writer_opts,
940 vec![0],
941 )
942 .await
943 .unwrap();
944
945 validate_sst(sstable_store, &info, meta, x_range).await;
946 }
947
948 #[tokio::test]
949 async fn test_streaming_upload() {
950 let sstable_store = mock_sstable_store().await;
952 let x_range = 0..100;
953 let (data, meta) = gen_test_sstable_data(
954 default_builder_opt_for_test(),
955 x_range
956 .clone()
957 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
958 )
959 .await;
960 let writer_opts = SstableWriterOptions {
961 capacity_hint: None,
962 tracker: None,
963 policy: CachePolicy::Disable,
964 };
965 let info = put_sst(
966 SST_ID,
967 data.clone(),
968 meta.clone(),
969 sstable_store.clone(),
970 writer_opts,
971 vec![0],
972 )
973 .await
974 .unwrap();
975
976 validate_sst(sstable_store, &info, meta, x_range).await;
977 }
978
979 #[tokio::test]
980 async fn test_basic() {
981 let sstable_store = mock_sstable_store().await;
982 let object_id = 123;
983 let data_path = sstable_store.get_sst_data_path(object_id);
984 assert_eq!(data_path, "test/123.data");
985 assert_eq!(
986 SstableStore::get_object_id_from_path(&data_path),
987 HummockObjectId::Sstable(object_id.into())
988 );
989 }
990}