risingwave_storage/hummock/
sstable_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::clone::Clone;
16use std::collections::VecDeque;
17use std::future::Future;
18use std::ops::Deref;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22
23use await_tree::{InstrumentAwait, SpanExt};
24use bytes::Bytes;
25use fail::fail_point;
26use foyer::{
27    Cache, CacheBuilder, CacheEntry, EventListener, Hint, HybridCache, HybridCacheBuilder,
28    HybridCacheEntry, HybridCacheProperties,
29};
30use futures::{FutureExt, StreamExt, future};
31use prost::Message;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
34use risingwave_hummock_sdk::{
35    HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
36    HummockVectorFileId, SST_OBJECT_SUFFIX,
37};
38use risingwave_hummock_trace::TracedCachePolicy;
39use risingwave_object_store::object::{
40    ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
41};
42use risingwave_pb::hummock::PbHnswGraph;
43use serde::{Deserialize, Serialize};
44use thiserror_ext::AsReport;
45use tokio::time::Instant;
46
47use super::{
48    BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
49    SstableWriterOptions,
50};
51use crate::hummock::block_stream::{
52    BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
53};
54use crate::hummock::none::NoneRecentFilter;
55use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
56use crate::hummock::vector::monitor::VectorStoreCacheStats;
57use crate::hummock::{BlockEntry, BlockHolder, HummockError, HummockResult, RecentFilterTrait};
58use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
59
60macro_rules! impl_vector_index_meta_file {
61    ($($type_name:ident),+) => {
62        pub enum HummockVectorIndexMetaFile {
63            $(
64                $type_name(Pin<Box<$type_name>>),
65            )+
66        }
67
68        $(
69            impl From<$type_name> for HummockVectorIndexMetaFile {
70                fn from(v: $type_name) -> Self {
71                    Self::$type_name(Box::pin(v))
72                }
73            }
74
75            unsafe impl Send for VectorMetaFileHolder<$type_name> {}
76
77            impl VectorMetaFileHolder<$type_name> {
78                fn try_from_entry(
79                    entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
80                    object_id: HummockRawObjectId
81                ) -> HummockResult<Self> {
82                    let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
83                        return Err(HummockError::decode_error(format!(
84                            "expect {} for object {}",
85                            stringify!($type_name),
86                            object_id
87                        )));
88                    };
89                    let ptr = file_meta.as_ref().get_ref() as *const _;
90                    Ok(VectorMetaFileHolder {
91                        _cache_entry: entry,
92                        ptr,
93                    })
94                }
95            }
96        )+
97    };
98}
99
100impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
101
102pub struct VectorMetaFileHolder<T> {
103    _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
104    ptr: *const T,
105}
106
107impl<T> Deref for VectorMetaFileHolder<T> {
108    type Target = T;
109
110    fn deref(&self) -> &Self::Target {
111        // SAFETY: VectorFileHolder is exposed only as immutable, and `VectorFileMeta` is pinned via box
112        unsafe { &*self.ptr }
113    }
114}
115
116pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
117
118pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
119
120pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
121pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
122
123#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
124pub struct SstableBlockIndex {
125    pub sst_id: HummockSstableObjectId,
126    pub block_idx: u64,
127}
128
129pub struct BlockCacheEventListener {
130    metrics: Arc<HummockStateStoreMetrics>,
131}
132
133impl BlockCacheEventListener {
134    pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
135        Self { metrics }
136    }
137}
138
139impl EventListener for BlockCacheEventListener {
140    type Key = SstableBlockIndex;
141    type Value = Box<Block>;
142
143    fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
144    where
145        Self::Key: foyer::Key,
146        Self::Value: foyer::Value,
147    {
148        self.metrics
149            .block_efficiency_histogram
150            .observe(value.efficiency());
151    }
152}
153
154// TODO: Define policy based on use cases (read / compaction / ...).
155#[derive(Clone, Copy, Eq, PartialEq)]
156pub enum CachePolicy {
157    /// Disable read cache and not fill the cache afterwards.
158    Disable,
159    /// Try reading the cache and fill the cache afterwards.
160    Fill(Hint),
161    /// Read the cache but not fill the cache afterwards.
162    NotFill,
163}
164
165impl Default for CachePolicy {
166    fn default() -> Self {
167        CachePolicy::Fill(Hint::Normal)
168    }
169}
170
171impl From<TracedCachePolicy> for CachePolicy {
172    fn from(policy: TracedCachePolicy) -> Self {
173        match policy {
174            TracedCachePolicy::Disable => Self::Disable,
175            TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
176            TracedCachePolicy::NotFill => Self::NotFill,
177        }
178    }
179}
180
181impl From<CachePolicy> for TracedCachePolicy {
182    fn from(policy: CachePolicy) -> Self {
183        match policy {
184            CachePolicy::Disable => Self::Disable,
185            CachePolicy::Fill(priority) => Self::Fill(priority.into()),
186            CachePolicy::NotFill => Self::NotFill,
187        }
188    }
189}
190
191pub struct SstableStoreConfig {
192    pub store: ObjectStoreRef,
193    pub path: String,
194
195    pub prefetch_buffer_capacity: usize,
196    pub max_prefetch_block_number: usize,
197    pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
198    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
199    pub use_new_object_prefix_strategy: bool,
200    pub skip_bloom_filter_in_serde: bool,
201
202    pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
203    pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
204
205    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
206    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
207}
208
209pub struct SstableStore {
210    path: String,
211    store: ObjectStoreRef,
212
213    meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
214    block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
215    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
216    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
217
218    /// Recent filter for `(sst_obj_id, blk_idx)`.
219    ///
220    /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
221    recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
222    prefetch_buffer_usage: Arc<AtomicUsize>,
223    prefetch_buffer_capacity: usize,
224    max_prefetch_block_number: usize,
225    /// Whether the object store is divided into prefixes depends on two factors:
226    ///   1. The specific object store type.
227    ///   2. Whether the existing cluster is a new cluster.
228    ///
229    /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters.
230    /// For a new cluster, `use_new_object_prefix_strategy` is set to True.
231    /// For an old cluster, `use_new_object_prefix_strategy` is set to False.
232    /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility.
233    use_new_object_prefix_strategy: bool,
234
235    /// sst serde happens when a sst meta is written to meta disk cache.
236    /// excluding bloom filter from serde can reduce the meta disk cache entry size
237    /// and reduce the disk io throughput at the cost of making the bloom filter useless
238    skip_bloom_filter_in_serde: bool,
239}
240
241impl SstableStore {
242    pub fn new(config: SstableStoreConfig) -> Self {
243        // TODO: We should validate path early. Otherwise object store won't report invalid path
244        // error until first write attempt.
245
246        Self {
247            path: config.path,
248            store: config.store,
249
250            meta_cache: config.meta_cache,
251            block_cache: config.block_cache,
252            vector_meta_cache: config.vector_meta_cache,
253            vector_block_cache: config.vector_block_cache,
254
255            recent_filter: config.recent_filter,
256            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
257            prefetch_buffer_capacity: config.prefetch_buffer_capacity,
258            max_prefetch_block_number: config.max_prefetch_block_number,
259            use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
260            skip_bloom_filter_in_serde: config.skip_bloom_filter_in_serde,
261        }
262    }
263
264    /// For compactor, we do not need a high concurrency load for cache. Instead, we need the cache
265    ///  can be evict more effective.
266    #[expect(clippy::borrowed_box)]
267    pub async fn for_compactor(
268        store: ObjectStoreRef,
269        path: String,
270        block_cache_capacity: usize,
271        meta_cache_capacity: usize,
272        use_new_object_prefix_strategy: bool,
273    ) -> HummockResult<Self> {
274        let meta_cache = HybridCacheBuilder::new()
275            .memory(meta_cache_capacity)
276            .with_shards(1)
277            .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
278                u64::BITS as usize / 8 + value.estimate_size()
279            })
280            .storage()
281            .build()
282            .await
283            .map_err(HummockError::foyer_error)?;
284
285        let block_cache = HybridCacheBuilder::new()
286            .memory(block_cache_capacity)
287            .with_shards(1)
288            .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
289                // FIXME(MrCroxx): Calculate block weight more accurately.
290                u64::BITS as usize * 2 / 8 + value.raw().len()
291            })
292            .storage()
293            .build()
294            .await
295            .map_err(HummockError::foyer_error)?;
296
297        Ok(Self {
298            path,
299            store,
300
301            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
302            prefetch_buffer_capacity: block_cache_capacity,
303            max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
304            recent_filter: Arc::new(NoneRecentFilter::default().into()),
305            use_new_object_prefix_strategy,
306            skip_bloom_filter_in_serde: false,
307
308            meta_cache,
309            block_cache,
310            vector_meta_cache: CacheBuilder::new(1 << 10).build(),
311            vector_block_cache: CacheBuilder::new(1 << 10).build(),
312        })
313    }
314
315    pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
316        self.store
317            .delete(self.get_sst_data_path(object_id).as_str())
318            .await?;
319        self.meta_cache.remove(&object_id);
320        // TODO(MrCroxx): support group remove in foyer.
321        Ok(())
322    }
323
324    pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
325        self.meta_cache.remove(&object_id);
326        Ok(())
327    }
328
329    pub(crate) async fn put_sst_data(
330        &self,
331        object_id: HummockSstableObjectId,
332        data: Bytes,
333    ) -> HummockResult<()> {
334        let data_path = self.get_sst_data_path(object_id);
335        self.store
336            .upload(&data_path, data)
337            .await
338            .map_err(Into::into)
339    }
340
341    pub async fn prefetch_blocks(
342        &self,
343        sst: &Sstable,
344        block_index: usize,
345        end_index: usize,
346        policy: CachePolicy,
347        stats: &mut StoreLocalStatistic,
348    ) -> HummockResult<Box<dyn BlockStream>> {
349        let object_id = sst.id;
350        if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
351            let block = self.get(sst, block_index, policy, stats).await?;
352            return Ok(Box::new(PrefetchBlockStream::new(
353                VecDeque::from([block]),
354                block_index,
355                None,
356            )));
357        }
358        if let Some(entry) = self
359            .block_cache
360            .get(&SstableBlockIndex {
361                sst_id: object_id,
362                block_idx: block_index as _,
363            })
364            .await
365            .map_err(HummockError::foyer_error)?
366        {
367            stats.cache_data_block_total += 1;
368            if entry.source() == foyer::Source::Outer {
369                stats.cache_data_block_miss += 1;
370            }
371            let block = BlockHolder::from_hybrid_cache_entry(entry);
372            return Ok(Box::new(PrefetchBlockStream::new(
373                VecDeque::from([block]),
374                block_index,
375                None,
376            )));
377        }
378        let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
379        let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
380        let start_offset = sst.meta.block_metas[block_index].offset as usize;
381        let mut min_hit_index = end_index;
382        let mut hit_count = 0;
383        for idx in block_index..end_index {
384            if self.block_cache.contains(&SstableBlockIndex {
385                sst_id: object_id,
386                block_idx: idx as _,
387            }) {
388                if min_hit_index > idx && idx > block_index {
389                    min_hit_index = idx;
390                }
391                hit_count += 1;
392            }
393        }
394
395        if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
396        {
397            end_index = min_hit_index;
398        }
399        stats.cache_data_prefetch_count += 1;
400        stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
401        let end_offset = start_offset
402            + sst.meta.block_metas[block_index..end_index]
403                .iter()
404                .map(|meta| meta.len as usize)
405                .sum::<usize>();
406        let data_path = self.get_sst_data_path(object_id);
407        let memory_usage = end_offset - start_offset;
408        let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
409        let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
410        let store = self.store.clone();
411        let join_handle = tokio::spawn(async move {
412            store
413                .read(&data_path, start_offset..end_offset)
414                .instrument_await(span)
415                .await
416        });
417        let buf = match join_handle.await {
418            Ok(Ok(data)) => data,
419            Ok(Err(e)) => {
420                tracing::error!(
421                    "prefetch meet error when read {}..{} from sst-{} ({})",
422                    start_offset,
423                    end_offset,
424                    object_id,
425                    sst.meta.estimated_size,
426                );
427                return Err(e.into());
428            }
429            Err(_) => {
430                return Err(HummockError::other("cancel by other thread"));
431            }
432        };
433        let mut offset = 0;
434        let mut blocks = VecDeque::default();
435        for idx in block_index..end_index {
436            let end = offset + sst.meta.block_metas[idx].len as usize;
437            if end > buf.len() {
438                return Err(ObjectError::internal("read unexpected EOF").into());
439            }
440            // copy again to avoid holding a large data in memory.
441            let block = Block::decode_with_copy(
442                buf.slice(offset..end),
443                sst.meta.block_metas[idx].uncompressed_size as usize,
444                true,
445            )?;
446            let holder = if let CachePolicy::Fill(hint) = policy {
447                let hint = if idx == block_index { hint } else { Hint::Low };
448                let entry = self.block_cache.insert_with_properties(
449                    SstableBlockIndex {
450                        sst_id: object_id,
451                        block_idx: idx as _,
452                    },
453                    Box::new(block),
454                    HybridCacheProperties::default().with_hint(hint),
455                );
456                BlockHolder::from_hybrid_cache_entry(entry)
457            } else {
458                BlockHolder::from_owned_block(Box::new(block))
459            };
460
461            blocks.push_back(holder);
462            offset = end;
463        }
464        Ok(Box::new(PrefetchBlockStream::new(
465            blocks,
466            block_index,
467            Some(tracker),
468        )))
469    }
470
471    pub async fn get_block_response(
472        &self,
473        sst: &Sstable,
474        block_index: usize,
475        policy: CachePolicy,
476    ) -> HummockResult<BlockResponse> {
477        let object_id = sst.id;
478        let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
479        let store = self.store.clone();
480
481        let file_size = sst.meta.estimated_size;
482        let data_path = Arc::new(self.get_sst_data_path(object_id));
483
484        let disable_cache: fn() -> bool = || {
485            fail_point!("disable_block_cache", |_| true);
486            false
487        };
488
489        let policy = if disable_cache() {
490            CachePolicy::Disable
491        } else {
492            policy
493        };
494
495        let idx = SstableBlockIndex {
496            sst_id: object_id,
497            block_idx: block_index as _,
498        };
499
500        self.recent_filter
501            .extend([(object_id, usize::MAX), (object_id, block_index)]);
502
503        // future: fetch block if hybrid cache miss
504        let fetch_block = async move {
505            let block_data = match store
506                .read(&data_path, range.clone())
507                .instrument_await("get_block_response".verbose())
508                .await
509            {
510                Ok(data) => data,
511                Err(e) => {
512                    tracing::error!(
513                        "get_block_response meet error when read {:?} from sst-{}, total length: {}",
514                        range,
515                        object_id,
516                        file_size
517                    );
518                    return Err(HummockError::from(e));
519                }
520            };
521            let block = Box::new(Block::decode(block_data, uncompressed_capacity)?);
522            Ok(block)
523        };
524
525        match policy {
526            CachePolicy::Fill(hint) => {
527                let properties = HybridCacheProperties::default().with_hint(hint);
528                let fetch = self.block_cache.get_or_fetch(&idx, || {
529                    fetch_block.map(|res| res.map(|block| (block, properties)))
530                });
531                Ok(BlockResponse::Fetch(fetch))
532            }
533            CachePolicy::NotFill => {
534                match self
535                    .block_cache
536                    .get(&idx)
537                    .await
538                    .map_err(HummockError::foyer_error)?
539                {
540                    Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
541                        entry,
542                    ))),
543                    _ => {
544                        let block = fetch_block.await?;
545                        Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
546                    }
547                }
548            }
549            CachePolicy::Disable => {
550                let block = fetch_block.await?;
551                Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
552            }
553        }
554    }
555
556    pub async fn get(
557        &self,
558        sst: &Sstable,
559        block_index: usize,
560        policy: CachePolicy,
561        stats: &mut StoreLocalStatistic,
562    ) -> HummockResult<BlockHolder> {
563        let block_response = self.get_block_response(sst, block_index, policy).await?;
564        let block_holder = block_response.wait().await?;
565        stats.cache_data_block_total += 1;
566        if let BlockEntry::HybridCache(entry) = block_holder.entry()
567            && entry.source() == foyer::Source::Outer
568        {
569            stats.cache_data_block_miss += 1;
570        }
571        Ok(block_holder)
572    }
573
574    pub async fn get_vector_file_meta(
575        &self,
576        vector_file: &VectorFileInfo,
577        stats: &mut VectorStoreCacheStats,
578    ) -> HummockResult<VectorFileHolder> {
579        let store = self.store.clone();
580        let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
581        let meta_offset = vector_file.meta_offset;
582        let entry = self
583            .vector_meta_cache
584            .get_or_fetch(&vector_file.object_id.as_raw(), || async move {
585                let encoded_footer = store.read(&path, meta_offset..).await?;
586                let meta = VectorFileMeta::decode_footer(&encoded_footer)?;
587                Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(meta))
588            })
589            .await?;
590        stats.file_meta_total += 1;
591        if entry.source() == foyer::Source::Outer {
592            stats.file_meta_miss += 1;
593        }
594        VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
595    }
596
597    pub async fn get_vector_block(
598        &self,
599        vector_file: &VectorFileInfo,
600        block_idx: usize,
601        block_meta: &VectorBlockMeta,
602        stats: &mut VectorStoreCacheStats,
603    ) -> HummockResult<VectorBlockHolder> {
604        let store = self.store.clone();
605        let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
606        let start_offset = block_meta.offset;
607        let end_offset = start_offset + block_meta.block_size;
608        let entry = self
609            .vector_block_cache
610            .get_or_fetch(&(vector_file.object_id, block_idx), || async move {
611                let encoded_block = store.read(&path, start_offset..end_offset).await?;
612                let block = VectorBlock::decode(&encoded_block)?;
613                Ok::<_, anyhow::Error>(Box::new(block))
614            })
615            .await
616            .map_err(HummockError::foyer_error)?;
617
618        stats.file_block_total += 1;
619        if entry.source() == foyer::Source::Outer {
620            stats.file_block_miss += 1;
621        }
622        Ok(entry)
623    }
624
625    pub fn insert_vector_cache(
626        &self,
627        object_id: HummockVectorFileId,
628        meta: VectorFileMeta,
629        blocks: Vec<VectorBlock>,
630    ) {
631        self.vector_meta_cache
632            .insert(object_id.as_raw(), meta.into());
633        for (idx, block) in blocks.into_iter().enumerate() {
634            self.vector_block_cache
635                .insert((object_id, idx), Box::new(block));
636        }
637    }
638
639    pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
640        self.vector_meta_cache
641            .insert(object_id.as_raw(), graph.into());
642    }
643
644    pub async fn get_hnsw_graph(
645        &self,
646        graph_file: &HnswGraphFileInfo,
647        stats: &mut VectorStoreCacheStats,
648    ) -> HummockResult<HnswGraphFileHolder> {
649        let store = self.store.clone();
650        let graph_file_path =
651            self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
652        let entry = self
653            .vector_meta_cache
654            .get_or_fetch(&graph_file.object_id.as_raw(), || async move {
655                let encoded_graph = store.read(&graph_file_path, ..).await?;
656                let graph = PbHnswGraph::decode(encoded_graph.as_ref())?;
657                Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(graph))
658            })
659            .await
660            .map_err(HummockError::foyer_error)?;
661        stats.hnsw_graph_total += 1;
662        if entry.source() == foyer::Source::Outer {
663            stats.hnsw_graph_miss += 1;
664        }
665        HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
666    }
667
668    pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
669        self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
670    }
671
672    pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
673        let obj_prefix = self.store.get_object_prefix(
674            object_id.as_raw().inner(),
675            self.use_new_object_prefix_strategy,
676        );
677        risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
678    }
679
680    pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
681        risingwave_hummock_sdk::get_object_id_from_path(path)
682    }
683
684    pub fn store(&self) -> ObjectStoreRef {
685        self.store.clone()
686    }
687
688    #[cfg(any(test, feature = "test"))]
689    pub async fn clear_block_cache(&self) -> HummockResult<()> {
690        self.block_cache
691            .clear()
692            .await
693            .map_err(HummockError::foyer_error)
694    }
695
696    #[cfg(any(test, feature = "test"))]
697    pub async fn clear_meta_cache(&self) -> HummockResult<()> {
698        self.meta_cache
699            .clear()
700            .await
701            .map_err(HummockError::foyer_error)
702    }
703
704    pub async fn sstable_cached(
705        &self,
706        sst_obj_id: HummockSstableObjectId,
707    ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
708        self.meta_cache
709            .get(&sst_obj_id)
710            .await
711            .map_err(HummockError::foyer_error)
712    }
713
714    /// Returns `table_holder`
715    pub fn sstable(
716        &self,
717        sstable_info_ref: &SstableInfo,
718        stats: &mut StoreLocalStatistic,
719    ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
720        let object_id = sstable_info_ref.object_id;
721        let store = self.store.clone();
722        let meta_path = self.get_sst_data_path(object_id);
723        let stats_ptr = stats.remote_io_time.clone();
724        let range = sstable_info_ref.meta_offset as usize..;
725        let skip_bloom_filter_in_serde = self.skip_bloom_filter_in_serde;
726
727        let entry = self.meta_cache.get_or_fetch(&object_id, || async move {
728            let now = Instant::now();
729            let buf = store
730                .read(&meta_path, range)
731                .instrument_await("get_meta_response".verbose())
732                .await?;
733            let meta = SstableMeta::decode(&buf[..])?;
734
735            let sst = Sstable::new(object_id, meta, skip_bloom_filter_in_serde);
736            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
737            stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
738            Ok::<_, anyhow::Error>(Box::new(sst))
739        });
740
741        stats.cache_meta_block_total += 1;
742
743        async move {
744            entry
745                .instrument_await("fetch_meta".verbose())
746                .await
747                .map_err(HummockError::foyer_error)
748        }
749    }
750
751    pub async fn list_sst_object_metadata_from_object_store(
752        &self,
753        prefix: Option<String>,
754        start_after: Option<String>,
755        limit: Option<usize>,
756    ) -> HummockResult<ObjectMetadataIter> {
757        let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
758        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
759        let iter = raw_iter.filter(|r| match r {
760            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
761            Err(_) => future::ready(true),
762        });
763        Ok(Box::pin(iter))
764    }
765
766    pub fn create_sst_writer(
767        self: Arc<Self>,
768        object_id: impl Into<HummockSstableObjectId>,
769        options: SstableWriterOptions,
770    ) -> BatchUploadWriter {
771        BatchUploadWriter::new(object_id, self, options)
772    }
773
774    pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
775        let sst = Sstable::new(object_id, meta, self.skip_bloom_filter_in_serde);
776        self.meta_cache.insert(object_id, Box::new(sst));
777    }
778
779    pub fn insert_block_cache(
780        &self,
781        object_id: HummockSstableObjectId,
782        block_index: u64,
783        block: Box<Block>,
784    ) {
785        self.block_cache.insert(
786            SstableBlockIndex {
787                sst_id: object_id,
788                block_idx: block_index,
789            },
790            block,
791        );
792    }
793
794    pub fn get_prefetch_memory_usage(&self) -> usize {
795        self.prefetch_buffer_usage.load(Ordering::Acquire)
796    }
797
798    pub async fn get_stream_for_blocks(
799        &self,
800        object_id: HummockSstableObjectId,
801        metas: &[BlockMeta],
802    ) -> HummockResult<BlockDataStream> {
803        fail_point!("get_stream_err");
804        let data_path = self.get_sst_data_path(object_id);
805        let store = self.store();
806        let block_meta = &metas[0];
807        let start_pos = block_meta.offset as usize;
808        let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
809        let range = start_pos..end_pos;
810        // spawn to tokio pool because the object-storage sdk may not be safe to cancel.
811        let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
812
813        let reader = match ret {
814            Ok(Ok(reader)) => reader,
815            Ok(Err(e)) => return Err(HummockError::from(e)),
816            Err(e) => {
817                return Err(HummockError::other(format!(
818                    "failed to get result, this read request may be canceled: {}",
819                    e.as_report()
820                )));
821            }
822        };
823        Ok(BlockDataStream::new(reader, metas.to_vec()))
824    }
825
826    pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
827        &self.meta_cache
828    }
829
830    pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
831        &self.block_cache
832    }
833
834    pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
835        &self.recent_filter
836    }
837
838    pub async fn create_streaming_uploader(
839        &self,
840        path: &str,
841    ) -> ObjectResult<ObjectStreamingUploader> {
842        self.store.streaming_upload(path).await
843    }
844}
845
846pub type SstableStoreRef = Arc<SstableStore>;
847#[cfg(test)]
848mod tests {
849    use std::ops::Range;
850    use std::sync::Arc;
851
852    use risingwave_hummock_sdk::HummockObjectId;
853    use risingwave_hummock_sdk::sstable_info::SstableInfo;
854
855    use super::{SstableStoreRef, SstableWriterOptions};
856    use crate::hummock::iterator::HummockIterator;
857    use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
858    use crate::hummock::sstable::SstableIteratorReadOptions;
859    use crate::hummock::test_utils::{
860        default_builder_opt_for_test, gen_test_sstable_data, put_sst,
861    };
862    use crate::hummock::value::HummockValue;
863    use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
864    use crate::monitor::StoreLocalStatistic;
865
866    const SST_ID: u64 = 1;
867
868    fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
869        HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
870    }
871
872    async fn validate_sst(
873        sstable_store: SstableStoreRef,
874        info: &SstableInfo,
875        mut meta: SstableMeta,
876        x_range: Range<usize>,
877    ) {
878        let mut stats = StoreLocalStatistic::default();
879        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
880        std::mem::take(&mut meta.bloom_filter);
881        assert_eq!(holder.meta, meta);
882        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
883        assert_eq!(holder.meta, meta);
884        let mut iter = SstableIterator::new(
885            holder,
886            sstable_store,
887            Arc::new(SstableIteratorReadOptions::default()),
888            info,
889        );
890        iter.rewind().await.unwrap();
891        for i in x_range {
892            let key = iter.key();
893            let value = iter.value();
894            assert_eq!(key, iterator_test_key_of(i).to_ref());
895            assert_eq!(value, get_hummock_value(i).as_slice());
896            iter.next().await.unwrap();
897        }
898    }
899
900    #[tokio::test]
901    async fn test_batch_upload() {
902        let sstable_store = mock_sstable_store().await;
903        let x_range = 0..100;
904        let (data, meta) = gen_test_sstable_data(
905            default_builder_opt_for_test(),
906            x_range
907                .clone()
908                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
909        )
910        .await;
911        let writer_opts = SstableWriterOptions {
912            capacity_hint: None,
913            tracker: None,
914            policy: CachePolicy::Disable,
915        };
916        let info = put_sst(
917            SST_ID,
918            data.clone(),
919            meta.clone(),
920            sstable_store.clone(),
921            writer_opts,
922            vec![0],
923        )
924        .await
925        .unwrap();
926
927        validate_sst(sstable_store, &info, meta, x_range).await;
928    }
929
930    #[tokio::test]
931    async fn test_streaming_upload() {
932        // Generate test data.
933        let sstable_store = mock_sstable_store().await;
934        let x_range = 0..100;
935        let (data, meta) = gen_test_sstable_data(
936            default_builder_opt_for_test(),
937            x_range
938                .clone()
939                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
940        )
941        .await;
942        let writer_opts = SstableWriterOptions {
943            capacity_hint: None,
944            tracker: None,
945            policy: CachePolicy::Disable,
946        };
947        let info = put_sst(
948            SST_ID,
949            data.clone(),
950            meta.clone(),
951            sstable_store.clone(),
952            writer_opts,
953            vec![0],
954        )
955        .await
956        .unwrap();
957
958        validate_sst(sstable_store, &info, meta, x_range).await;
959    }
960
961    #[tokio::test]
962    async fn test_basic() {
963        let sstable_store = mock_sstable_store().await;
964        let object_id = 123;
965        let data_path = sstable_store.get_sst_data_path(object_id);
966        assert_eq!(data_path, "test/123.data");
967        assert_eq!(
968            SstableStore::get_object_id_from_path(&data_path),
969            HummockObjectId::Sstable(object_id.into())
970        );
971    }
972}