1use std::clone::Clone;
16use std::collections::VecDeque;
17use std::future::Future;
18use std::ops::Deref;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22
23use await_tree::{InstrumentAwait, SpanExt};
24use bytes::Bytes;
25use fail::fail_point;
26use foyer::{
27 Cache, CacheBuilder, CacheEntry, EventListener, FetchState, Hint, HybridCache,
28 HybridCacheBuilder, HybridCacheEntry, HybridCacheProperties,
29};
30use futures::{StreamExt, future};
31use prost::Message;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
34use risingwave_hummock_sdk::{
35 HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
36 HummockVectorFileId, SST_OBJECT_SUFFIX,
37};
38use risingwave_hummock_trace::TracedCachePolicy;
39use risingwave_object_store::object::{
40 ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
41};
42use risingwave_pb::hummock::PbHnswGraph;
43use serde::{Deserialize, Serialize};
44use thiserror_ext::AsReport;
45use tokio::time::Instant;
46
47use super::{
48 BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
49 SstableWriterOptions,
50};
51use crate::hummock::block_stream::{
52 BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
53};
54use crate::hummock::none::NoneRecentFilter;
55use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
56use crate::hummock::{BlockHolder, HummockError, HummockResult, RecentFilterTrait};
57use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
58
59macro_rules! impl_vector_index_meta_file {
60 ($($type_name:ident),+) => {
61 pub enum HummockVectorIndexMetaFile {
62 $(
63 $type_name(Pin<Box<$type_name>>),
64 )+
65 }
66
67 $(
68 impl From<$type_name> for HummockVectorIndexMetaFile {
69 fn from(v: $type_name) -> Self {
70 Self::$type_name(Box::pin(v))
71 }
72 }
73
74 unsafe impl Send for VectorMetaFileHolder<$type_name> {}
75
76 impl VectorMetaFileHolder<$type_name> {
77 fn try_from_entry(
78 entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
79 object_id: HummockRawObjectId
80 ) -> HummockResult<Self> {
81 let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
82 return Err(HummockError::decode_error(format!(
83 "expect {} for object {}",
84 stringify!($type_name),
85 object_id
86 )));
87 };
88 let ptr = file_meta.as_ref().get_ref() as *const _;
89 Ok(VectorMetaFileHolder {
90 _cache_entry: entry,
91 ptr,
92 })
93 }
94 }
95 )+
96 };
97}
98
99impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
100
101pub struct VectorMetaFileHolder<T> {
102 _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
103 ptr: *const T,
104}
105
106impl<T> Deref for VectorMetaFileHolder<T> {
107 type Target = T;
108
109 fn deref(&self) -> &Self::Target {
110 unsafe { &*self.ptr }
112 }
113}
114
115pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
116
117pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
118
119pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
120pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
121
122#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
123pub struct SstableBlockIndex {
124 pub sst_id: HummockSstableObjectId,
125 pub block_idx: u64,
126}
127
128pub struct BlockCacheEventListener {
129 metrics: Arc<HummockStateStoreMetrics>,
130}
131
132impl BlockCacheEventListener {
133 pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
134 Self { metrics }
135 }
136}
137
138impl EventListener for BlockCacheEventListener {
139 type Key = SstableBlockIndex;
140 type Value = Box<Block>;
141
142 fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
143 where
144 Self::Key: foyer::Key,
145 Self::Value: foyer::Value,
146 {
147 self.metrics
148 .block_efficiency_histogram
149 .observe(value.efficiency());
150 }
151}
152
153#[derive(Clone, Copy, Eq, PartialEq)]
155pub enum CachePolicy {
156 Disable,
158 Fill(Hint),
160 NotFill,
162}
163
164impl Default for CachePolicy {
165 fn default() -> Self {
166 CachePolicy::Fill(Hint::Normal)
167 }
168}
169
170impl From<TracedCachePolicy> for CachePolicy {
171 fn from(policy: TracedCachePolicy) -> Self {
172 match policy {
173 TracedCachePolicy::Disable => Self::Disable,
174 TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
175 TracedCachePolicy::NotFill => Self::NotFill,
176 }
177 }
178}
179
180impl From<CachePolicy> for TracedCachePolicy {
181 fn from(policy: CachePolicy) -> Self {
182 match policy {
183 CachePolicy::Disable => Self::Disable,
184 CachePolicy::Fill(priority) => Self::Fill(priority.into()),
185 CachePolicy::NotFill => Self::NotFill,
186 }
187 }
188}
189
190pub struct SstableStoreConfig {
191 pub store: ObjectStoreRef,
192 pub path: String,
193
194 pub prefetch_buffer_capacity: usize,
195 pub max_prefetch_block_number: usize,
196 pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
197 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
198 pub use_new_object_prefix_strategy: bool,
199
200 pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
201 pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
202
203 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
204 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
205}
206
207pub struct SstableStore {
208 path: String,
209 store: ObjectStoreRef,
210
211 meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
212 block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
213 pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
214 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
215
216 recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
220 prefetch_buffer_usage: Arc<AtomicUsize>,
221 prefetch_buffer_capacity: usize,
222 max_prefetch_block_number: usize,
223 use_new_object_prefix_strategy: bool,
232}
233
234impl SstableStore {
235 pub fn new(config: SstableStoreConfig) -> Self {
236 Self {
240 path: config.path,
241 store: config.store,
242
243 meta_cache: config.meta_cache,
244 block_cache: config.block_cache,
245 vector_meta_cache: config.vector_meta_cache,
246 vector_block_cache: config.vector_block_cache,
247
248 recent_filter: config.recent_filter,
249 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
250 prefetch_buffer_capacity: config.prefetch_buffer_capacity,
251 max_prefetch_block_number: config.max_prefetch_block_number,
252 use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
253 }
254 }
255
256 #[expect(clippy::borrowed_box)]
259 pub async fn for_compactor(
260 store: ObjectStoreRef,
261 path: String,
262 block_cache_capacity: usize,
263 meta_cache_capacity: usize,
264 use_new_object_prefix_strategy: bool,
265 ) -> HummockResult<Self> {
266 let meta_cache = HybridCacheBuilder::new()
267 .memory(meta_cache_capacity)
268 .with_shards(1)
269 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
270 u64::BITS as usize / 8 + value.estimate_size()
271 })
272 .storage()
273 .build()
274 .await
275 .map_err(HummockError::foyer_error)?;
276
277 let block_cache = HybridCacheBuilder::new()
278 .memory(block_cache_capacity)
279 .with_shards(1)
280 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
281 u64::BITS as usize * 2 / 8 + value.raw().len()
283 })
284 .storage()
285 .build()
286 .await
287 .map_err(HummockError::foyer_error)?;
288
289 Ok(Self {
290 path,
291 store,
292
293 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
294 prefetch_buffer_capacity: block_cache_capacity,
295 max_prefetch_block_number: 16, recent_filter: Arc::new(NoneRecentFilter::default().into()),
297 use_new_object_prefix_strategy,
298
299 meta_cache,
300 block_cache,
301 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
302 vector_block_cache: CacheBuilder::new(1 << 10).build(),
303 })
304 }
305
306 pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
307 self.store
308 .delete(self.get_sst_data_path(object_id).as_str())
309 .await?;
310 self.meta_cache.remove(&object_id);
311 Ok(())
313 }
314
315 pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
316 self.meta_cache.remove(&object_id);
317 Ok(())
318 }
319
320 pub(crate) async fn put_sst_data(
321 &self,
322 object_id: HummockSstableObjectId,
323 data: Bytes,
324 ) -> HummockResult<()> {
325 let data_path = self.get_sst_data_path(object_id);
326 self.store
327 .upload(&data_path, data)
328 .await
329 .map_err(Into::into)
330 }
331
332 pub async fn prefetch_blocks(
333 &self,
334 sst: &Sstable,
335 block_index: usize,
336 end_index: usize,
337 policy: CachePolicy,
338 stats: &mut StoreLocalStatistic,
339 ) -> HummockResult<Box<dyn BlockStream>> {
340 let object_id = sst.id;
341 if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
342 let block = self.get(sst, block_index, policy, stats).await?;
343 return Ok(Box::new(PrefetchBlockStream::new(
344 VecDeque::from([block]),
345 block_index,
346 None,
347 )));
348 }
349 stats.cache_data_block_total += 1;
350 if let Some(entry) = self
351 .block_cache
352 .get(&SstableBlockIndex {
353 sst_id: object_id,
354 block_idx: block_index as _,
355 })
356 .await
357 .map_err(HummockError::foyer_error)?
358 {
359 let block = BlockHolder::from_hybrid_cache_entry(entry);
360 return Ok(Box::new(PrefetchBlockStream::new(
361 VecDeque::from([block]),
362 block_index,
363 None,
364 )));
365 }
366 let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
367 let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
368 let start_offset = sst.meta.block_metas[block_index].offset as usize;
369 let mut min_hit_index = end_index;
370 let mut hit_count = 0;
371 for idx in block_index..end_index {
372 if self.block_cache.contains(&SstableBlockIndex {
373 sst_id: object_id,
374 block_idx: idx as _,
375 }) {
376 if min_hit_index > idx && idx > block_index {
377 min_hit_index = idx;
378 }
379 hit_count += 1;
380 }
381 }
382
383 if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
384 {
385 end_index = min_hit_index;
386 }
387 stats.cache_data_prefetch_count += 1;
388 stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
389 let end_offset = start_offset
390 + sst.meta.block_metas[block_index..end_index]
391 .iter()
392 .map(|meta| meta.len as usize)
393 .sum::<usize>();
394 let data_path = self.get_sst_data_path(object_id);
395 let memory_usage = end_offset - start_offset;
396 let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
397 let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
398 let store = self.store.clone();
399 let join_handle = tokio::spawn(async move {
400 store
401 .read(&data_path, start_offset..end_offset)
402 .instrument_await(span)
403 .await
404 });
405 let buf = match join_handle.await {
406 Ok(Ok(data)) => data,
407 Ok(Err(e)) => {
408 tracing::error!(
409 "prefetch meet error when read {}..{} from sst-{} ({})",
410 start_offset,
411 end_offset,
412 object_id,
413 sst.meta.estimated_size,
414 );
415 return Err(e.into());
416 }
417 Err(_) => {
418 return Err(HummockError::other("cancel by other thread"));
419 }
420 };
421 let mut offset = 0;
422 let mut blocks = VecDeque::default();
423 for idx in block_index..end_index {
424 let end = offset + sst.meta.block_metas[idx].len as usize;
425 if end > buf.len() {
426 return Err(ObjectError::internal("read unexpected EOF").into());
427 }
428 let block = Block::decode_with_copy(
430 buf.slice(offset..end),
431 sst.meta.block_metas[idx].uncompressed_size as usize,
432 true,
433 )?;
434 let holder = if let CachePolicy::Fill(hint) = policy {
435 let hint = if idx == block_index { hint } else { Hint::Low };
436 let entry = self.block_cache.insert_with_properties(
437 SstableBlockIndex {
438 sst_id: object_id,
439 block_idx: idx as _,
440 },
441 Box::new(block),
442 HybridCacheProperties::default().with_hint(hint),
443 );
444 BlockHolder::from_hybrid_cache_entry(entry)
445 } else {
446 BlockHolder::from_owned_block(Box::new(block))
447 };
448
449 blocks.push_back(holder);
450 offset = end;
451 }
452 Ok(Box::new(PrefetchBlockStream::new(
453 blocks,
454 block_index,
455 Some(tracker),
456 )))
457 }
458
459 pub async fn get_block_response(
460 &self,
461 sst: &Sstable,
462 block_index: usize,
463 policy: CachePolicy,
464 stats: &mut StoreLocalStatistic,
465 ) -> HummockResult<BlockResponse> {
466 let object_id = sst.id;
467 let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
468 let store = self.store.clone();
469
470 stats.cache_data_block_total += 1;
471 let file_size = sst.meta.estimated_size;
472 let data_path = self.get_sst_data_path(object_id);
473
474 let disable_cache: fn() -> bool = || {
475 fail_point!("disable_block_cache", |_| true);
476 false
477 };
478
479 let policy = if disable_cache() {
480 CachePolicy::Disable
481 } else {
482 policy
483 };
484
485 let idx = SstableBlockIndex {
486 sst_id: object_id,
487 block_idx: block_index as _,
488 };
489
490 let fetch_block = move || {
492 let range = range.clone();
493
494 async move {
495 let block_data = match store
496 .read(&data_path, range.clone())
497 .instrument_await("get_block_response".verbose())
498 .await
499 {
500 Ok(data) => data,
501 Err(e) => {
502 tracing::error!(
503 "get_block_response meet error when read {:?} from sst-{}, total length: {}",
504 range,
505 object_id,
506 file_size
507 );
508 return Err(foyer::Error::other(HummockError::from(e)));
509 }
510 };
511 let block = Box::new(
512 Block::decode(block_data, uncompressed_capacity)
513 .map_err(foyer::Error::other)?,
514 );
515 Ok(block)
516 }
517 };
518
519 self.recent_filter
520 .extend([(object_id, usize::MAX), (object_id, block_index)]);
521
522 match policy {
523 CachePolicy::Fill(hint) => {
524 let entry = self.block_cache.fetch_with_properties(
525 idx,
526 HybridCacheProperties::default().with_hint(hint),
527 fetch_block,
528 );
529 if matches!(entry.state(), FetchState::Miss) {
530 stats.cache_data_block_miss += 1;
531 }
532 Ok(BlockResponse::Entry(entry))
533 }
534 CachePolicy::NotFill => {
535 match self
536 .block_cache
537 .get(&idx)
538 .await
539 .map_err(HummockError::foyer_error)?
540 {
541 Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
542 entry,
543 ))),
544 _ => {
545 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
546 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
547 }
548 }
549 }
550 CachePolicy::Disable => {
551 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
552 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
553 }
554 }
555 }
556
557 pub async fn get(
558 &self,
559 sst: &Sstable,
560 block_index: usize,
561 policy: CachePolicy,
562 stats: &mut StoreLocalStatistic,
563 ) -> HummockResult<BlockHolder> {
564 match self
565 .get_block_response(sst, block_index, policy, stats)
566 .await
567 {
568 Ok(block_response) => block_response.wait().await,
569 Err(err) => Err(err),
570 }
571 }
572
573 pub async fn get_vector_file_meta(
574 &self,
575 vector_file: &VectorFileInfo,
576 ) -> HummockResult<VectorFileHolder> {
577 let entry = self
578 .vector_meta_cache
579 .fetch(vector_file.object_id.as_raw(), || {
580 let store = self.store.clone();
581 let path =
582 self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
583 let meta_offset = vector_file.meta_offset;
584 async move {
585 let encoded_footer = store
586 .read(&path, meta_offset..)
587 .await
588 .map_err(foyer::Error::other)?;
589 let meta = VectorFileMeta::decode_footer(&encoded_footer)
590 .map_err(foyer::Error::other)?;
591 Ok::<_, foyer::Error>(meta.into())
592 }
593 })
594 .await?;
595 VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
596 }
597
598 pub async fn get_vector_block(
599 &self,
600 vector_file: &VectorFileInfo,
601 block_idx: usize,
602 block_meta: &VectorBlockMeta,
603 ) -> HummockResult<VectorBlockHolder> {
604 self.vector_block_cache
605 .fetch((vector_file.object_id, block_idx), || {
606 let store = self.store.clone();
607 let path =
608 self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
609 let start_offset = block_meta.offset;
610 let end_offset = start_offset + block_meta.block_size;
611 async move {
612 let encoded_block = store
613 .read(&path, start_offset..end_offset)
614 .await
615 .map_err(foyer::Error::other)?;
616 let block = VectorBlock::decode(&encoded_block).map_err(foyer::Error::other)?;
617 Ok(Box::new(block))
618 }
619 })
620 .await
621 .map_err(HummockError::foyer_error)
622 }
623
624 pub fn insert_vector_cache(
625 &self,
626 object_id: HummockVectorFileId,
627 meta: VectorFileMeta,
628 blocks: Vec<VectorBlock>,
629 ) {
630 self.vector_meta_cache
631 .insert(object_id.as_raw(), meta.into());
632 for (idx, block) in blocks.into_iter().enumerate() {
633 self.vector_block_cache
634 .insert((object_id, idx), Box::new(block));
635 }
636 }
637
638 pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
639 self.vector_meta_cache
640 .insert(object_id.as_raw(), graph.into());
641 }
642
643 pub async fn get_hnsw_graph(
644 &self,
645 graph_file: &HnswGraphFileInfo,
646 ) -> HummockResult<HnswGraphFileHolder> {
647 let entry = self
648 .vector_meta_cache
649 .fetch(graph_file.object_id.as_raw(), || {
650 let store = self.store.clone();
651 let graph_file_path =
652 self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
653 async move {
654 let encoded_graph = store
655 .read(&graph_file_path, ..)
656 .await
657 .map_err(foyer::Error::other)?;
658 let graph =
659 PbHnswGraph::decode(encoded_graph.as_ref()).map_err(foyer::Error::other)?;
660 Ok::<_, foyer::Error>(graph.into())
661 }
662 })
663 .await?;
664 HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
665 }
666
667 pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
668 self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
669 }
670
671 pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
672 let obj_prefix = self.store.get_object_prefix(
673 object_id.as_raw().inner(),
674 self.use_new_object_prefix_strategy,
675 );
676 risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
677 }
678
679 pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
680 risingwave_hummock_sdk::get_object_id_from_path(path)
681 }
682
683 pub fn store(&self) -> ObjectStoreRef {
684 self.store.clone()
685 }
686
687 #[cfg(any(test, feature = "test"))]
688 pub async fn clear_block_cache(&self) -> HummockResult<()> {
689 self.block_cache
690 .clear()
691 .await
692 .map_err(HummockError::foyer_error)
693 }
694
695 #[cfg(any(test, feature = "test"))]
696 pub async fn clear_meta_cache(&self) -> HummockResult<()> {
697 self.meta_cache
698 .clear()
699 .await
700 .map_err(HummockError::foyer_error)
701 }
702
703 pub async fn sstable_cached(
704 &self,
705 sst_obj_id: HummockSstableObjectId,
706 ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
707 self.meta_cache
708 .get(&sst_obj_id)
709 .await
710 .map_err(HummockError::foyer_error)
711 }
712
713 pub fn sstable(
715 &self,
716 sstable_info_ref: &SstableInfo,
717 stats: &mut StoreLocalStatistic,
718 ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
719 let object_id = sstable_info_ref.object_id;
720
721 let entry = self.meta_cache.fetch(object_id, || {
722 let store = self.store.clone();
723 let meta_path = self.get_sst_data_path(object_id);
724 let stats_ptr = stats.remote_io_time.clone();
725 let range = sstable_info_ref.meta_offset as usize..;
726 async move {
727 let now = Instant::now();
728 let buf = store
729 .read(&meta_path, range)
730 .await
731 .map_err(foyer::Error::other)?;
732 let meta = SstableMeta::decode(&buf[..]).map_err(foyer::Error::other)?;
733
734 let sst = Sstable::new(object_id, meta);
735 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
736 stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
737 Ok(Box::new(sst))
738 }
739 });
740
741 if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
742 stats.cache_meta_block_miss += 1;
743 }
744
745 stats.cache_meta_block_total += 1;
746
747 async move { entry.await.map_err(HummockError::foyer_error) }
748 }
749
750 pub async fn list_sst_object_metadata_from_object_store(
751 &self,
752 prefix: Option<String>,
753 start_after: Option<String>,
754 limit: Option<usize>,
755 ) -> HummockResult<ObjectMetadataIter> {
756 let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
757 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
758 let iter = raw_iter.filter(|r| match r {
759 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
760 Err(_) => future::ready(true),
761 });
762 Ok(Box::pin(iter))
763 }
764
765 pub fn create_sst_writer(
766 self: Arc<Self>,
767 object_id: impl Into<HummockSstableObjectId>,
768 options: SstableWriterOptions,
769 ) -> BatchUploadWriter {
770 BatchUploadWriter::new(object_id, self, options)
771 }
772
773 pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
774 let sst = Sstable::new(object_id, meta);
775 self.meta_cache.insert(object_id, Box::new(sst));
776 }
777
778 pub fn insert_block_cache(
779 &self,
780 object_id: HummockSstableObjectId,
781 block_index: u64,
782 block: Box<Block>,
783 ) {
784 self.block_cache.insert(
785 SstableBlockIndex {
786 sst_id: object_id,
787 block_idx: block_index,
788 },
789 block,
790 );
791 }
792
793 pub fn get_prefetch_memory_usage(&self) -> usize {
794 self.prefetch_buffer_usage.load(Ordering::Acquire)
795 }
796
797 pub async fn get_stream_for_blocks(
798 &self,
799 object_id: HummockSstableObjectId,
800 metas: &[BlockMeta],
801 ) -> HummockResult<BlockDataStream> {
802 fail_point!("get_stream_err");
803 let data_path = self.get_sst_data_path(object_id);
804 let store = self.store().clone();
805 let block_meta = &metas[0];
806 let start_pos = block_meta.offset as usize;
807 let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
808 let range = start_pos..end_pos;
809 let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
811
812 let reader = match ret {
813 Ok(Ok(reader)) => reader,
814 Ok(Err(e)) => return Err(HummockError::from(e)),
815 Err(e) => {
816 return Err(HummockError::other(format!(
817 "failed to get result, this read request may be canceled: {}",
818 e.as_report()
819 )));
820 }
821 };
822 Ok(BlockDataStream::new(reader, metas.to_vec()))
823 }
824
825 pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
826 &self.meta_cache
827 }
828
829 pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
830 &self.block_cache
831 }
832
833 pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
834 &self.recent_filter
835 }
836
837 pub async fn create_streaming_uploader(
838 &self,
839 path: &str,
840 ) -> ObjectResult<ObjectStreamingUploader> {
841 self.store.streaming_upload(path).await
842 }
843}
844
845pub type SstableStoreRef = Arc<SstableStore>;
846#[cfg(test)]
847mod tests {
848 use std::ops::Range;
849 use std::sync::Arc;
850
851 use risingwave_hummock_sdk::HummockObjectId;
852 use risingwave_hummock_sdk::sstable_info::SstableInfo;
853
854 use super::{SstableStoreRef, SstableWriterOptions};
855 use crate::hummock::iterator::HummockIterator;
856 use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
857 use crate::hummock::sstable::SstableIteratorReadOptions;
858 use crate::hummock::test_utils::{
859 default_builder_opt_for_test, gen_test_sstable_data, put_sst,
860 };
861 use crate::hummock::value::HummockValue;
862 use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
863 use crate::monitor::StoreLocalStatistic;
864
865 const SST_ID: u64 = 1;
866
867 fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
868 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
869 }
870
871 async fn validate_sst(
872 sstable_store: SstableStoreRef,
873 info: &SstableInfo,
874 mut meta: SstableMeta,
875 x_range: Range<usize>,
876 ) {
877 let mut stats = StoreLocalStatistic::default();
878 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
879 std::mem::take(&mut meta.bloom_filter);
880 assert_eq!(holder.meta, meta);
881 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
882 assert_eq!(holder.meta, meta);
883 let mut iter = SstableIterator::new(
884 holder,
885 sstable_store,
886 Arc::new(SstableIteratorReadOptions::default()),
887 info,
888 );
889 iter.rewind().await.unwrap();
890 for i in x_range {
891 let key = iter.key();
892 let value = iter.value();
893 assert_eq!(key, iterator_test_key_of(i).to_ref());
894 assert_eq!(value, get_hummock_value(i).as_slice());
895 iter.next().await.unwrap();
896 }
897 }
898
899 #[tokio::test]
900 async fn test_batch_upload() {
901 let sstable_store = mock_sstable_store().await;
902 let x_range = 0..100;
903 let (data, meta) = gen_test_sstable_data(
904 default_builder_opt_for_test(),
905 x_range
906 .clone()
907 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
908 )
909 .await;
910 let writer_opts = SstableWriterOptions {
911 capacity_hint: None,
912 tracker: None,
913 policy: CachePolicy::Disable,
914 };
915 let info = put_sst(
916 SST_ID,
917 data.clone(),
918 meta.clone(),
919 sstable_store.clone(),
920 writer_opts,
921 vec![0],
922 )
923 .await
924 .unwrap();
925
926 validate_sst(sstable_store, &info, meta, x_range).await;
927 }
928
929 #[tokio::test]
930 async fn test_streaming_upload() {
931 let sstable_store = mock_sstable_store().await;
933 let x_range = 0..100;
934 let (data, meta) = gen_test_sstable_data(
935 default_builder_opt_for_test(),
936 x_range
937 .clone()
938 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
939 )
940 .await;
941 let writer_opts = SstableWriterOptions {
942 capacity_hint: None,
943 tracker: None,
944 policy: CachePolicy::Disable,
945 };
946 let info = put_sst(
947 SST_ID,
948 data.clone(),
949 meta.clone(),
950 sstable_store.clone(),
951 writer_opts,
952 vec![0],
953 )
954 .await
955 .unwrap();
956
957 validate_sst(sstable_store, &info, meta, x_range).await;
958 }
959
960 #[tokio::test]
961 async fn test_basic() {
962 let sstable_store = mock_sstable_store().await;
963 let object_id = 123;
964 let data_path = sstable_store.get_sst_data_path(object_id);
965 assert_eq!(data_path, "test/123.data");
966 assert_eq!(
967 SstableStore::get_object_id_from_path(&data_path),
968 HummockObjectId::Sstable(object_id.into())
969 );
970 }
971}