risingwave_storage/hummock/
sstable_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::clone::Clone;
16use std::collections::VecDeque;
17use std::future::Future;
18use std::ops::Deref;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22
23use await_tree::{InstrumentAwait, SpanExt};
24use bytes::Bytes;
25use fail::fail_point;
26use foyer::{
27    Cache, CacheBuilder, CacheEntry, EventListener, FetchState, Hint, HybridCache,
28    HybridCacheBuilder, HybridCacheEntry, HybridCacheProperties,
29};
30use futures::{StreamExt, future};
31use prost::Message;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
34use risingwave_hummock_sdk::{
35    HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
36    HummockVectorFileId, SST_OBJECT_SUFFIX,
37};
38use risingwave_hummock_trace::TracedCachePolicy;
39use risingwave_object_store::object::{
40    ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
41};
42use risingwave_pb::hummock::PbHnswGraph;
43use serde::{Deserialize, Serialize};
44use thiserror_ext::AsReport;
45use tokio::time::Instant;
46
47use super::{
48    BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
49    SstableWriterOptions,
50};
51use crate::hummock::block_stream::{
52    BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
53};
54use crate::hummock::none::NoneRecentFilter;
55use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
56use crate::hummock::vector::monitor::VectorStoreCacheStats;
57use crate::hummock::{BlockHolder, HummockError, HummockResult, RecentFilterTrait};
58use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
59
60macro_rules! impl_vector_index_meta_file {
61    ($($type_name:ident),+) => {
62        pub enum HummockVectorIndexMetaFile {
63            $(
64                $type_name(Pin<Box<$type_name>>),
65            )+
66        }
67
68        $(
69            impl From<$type_name> for HummockVectorIndexMetaFile {
70                fn from(v: $type_name) -> Self {
71                    Self::$type_name(Box::pin(v))
72                }
73            }
74
75            unsafe impl Send for VectorMetaFileHolder<$type_name> {}
76
77            impl VectorMetaFileHolder<$type_name> {
78                fn try_from_entry(
79                    entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
80                    object_id: HummockRawObjectId
81                ) -> HummockResult<Self> {
82                    let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
83                        return Err(HummockError::decode_error(format!(
84                            "expect {} for object {}",
85                            stringify!($type_name),
86                            object_id
87                        )));
88                    };
89                    let ptr = file_meta.as_ref().get_ref() as *const _;
90                    Ok(VectorMetaFileHolder {
91                        _cache_entry: entry,
92                        ptr,
93                    })
94                }
95            }
96        )+
97    };
98}
99
100impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
101
102pub struct VectorMetaFileHolder<T> {
103    _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
104    ptr: *const T,
105}
106
107impl<T> Deref for VectorMetaFileHolder<T> {
108    type Target = T;
109
110    fn deref(&self) -> &Self::Target {
111        // SAFETY: VectorFileHolder is exposed only as immutable, and `VectorFileMeta` is pinned via box
112        unsafe { &*self.ptr }
113    }
114}
115
116pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
117
118pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
119
120pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
121pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
122
123#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
124pub struct SstableBlockIndex {
125    pub sst_id: HummockSstableObjectId,
126    pub block_idx: u64,
127}
128
129pub struct BlockCacheEventListener {
130    metrics: Arc<HummockStateStoreMetrics>,
131}
132
133impl BlockCacheEventListener {
134    pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
135        Self { metrics }
136    }
137}
138
139impl EventListener for BlockCacheEventListener {
140    type Key = SstableBlockIndex;
141    type Value = Box<Block>;
142
143    fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
144    where
145        Self::Key: foyer::Key,
146        Self::Value: foyer::Value,
147    {
148        self.metrics
149            .block_efficiency_histogram
150            .observe(value.efficiency());
151    }
152}
153
154// TODO: Define policy based on use cases (read / compaction / ...).
155#[derive(Clone, Copy, Eq, PartialEq)]
156pub enum CachePolicy {
157    /// Disable read cache and not fill the cache afterwards.
158    Disable,
159    /// Try reading the cache and fill the cache afterwards.
160    Fill(Hint),
161    /// Read the cache but not fill the cache afterwards.
162    NotFill,
163}
164
165impl Default for CachePolicy {
166    fn default() -> Self {
167        CachePolicy::Fill(Hint::Normal)
168    }
169}
170
171impl From<TracedCachePolicy> for CachePolicy {
172    fn from(policy: TracedCachePolicy) -> Self {
173        match policy {
174            TracedCachePolicy::Disable => Self::Disable,
175            TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
176            TracedCachePolicy::NotFill => Self::NotFill,
177        }
178    }
179}
180
181impl From<CachePolicy> for TracedCachePolicy {
182    fn from(policy: CachePolicy) -> Self {
183        match policy {
184            CachePolicy::Disable => Self::Disable,
185            CachePolicy::Fill(priority) => Self::Fill(priority.into()),
186            CachePolicy::NotFill => Self::NotFill,
187        }
188    }
189}
190
191pub struct SstableStoreConfig {
192    pub store: ObjectStoreRef,
193    pub path: String,
194
195    pub prefetch_buffer_capacity: usize,
196    pub max_prefetch_block_number: usize,
197    pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
198    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
199    pub use_new_object_prefix_strategy: bool,
200
201    pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
202    pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
203
204    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
205    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
206}
207
208pub struct SstableStore {
209    path: String,
210    store: ObjectStoreRef,
211
212    meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
213    block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
214    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
215    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
216
217    /// Recent filter for `(sst_obj_id, blk_idx)`.
218    ///
219    /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
220    recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
221    prefetch_buffer_usage: Arc<AtomicUsize>,
222    prefetch_buffer_capacity: usize,
223    max_prefetch_block_number: usize,
224    /// Whether the object store is divided into prefixes depends on two factors:
225    ///   1. The specific object store type.
226    ///   2. Whether the existing cluster is a new cluster.
227    ///
228    /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters.
229    /// For a new cluster, `use_new_object_prefix_strategy` is set to True.
230    /// For an old cluster, `use_new_object_prefix_strategy` is set to False.
231    /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility.
232    use_new_object_prefix_strategy: bool,
233}
234
235impl SstableStore {
236    pub fn new(config: SstableStoreConfig) -> Self {
237        // TODO: We should validate path early. Otherwise object store won't report invalid path
238        // error until first write attempt.
239
240        Self {
241            path: config.path,
242            store: config.store,
243
244            meta_cache: config.meta_cache,
245            block_cache: config.block_cache,
246            vector_meta_cache: config.vector_meta_cache,
247            vector_block_cache: config.vector_block_cache,
248
249            recent_filter: config.recent_filter,
250            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
251            prefetch_buffer_capacity: config.prefetch_buffer_capacity,
252            max_prefetch_block_number: config.max_prefetch_block_number,
253            use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
254        }
255    }
256
257    /// For compactor, we do not need a high concurrency load for cache. Instead, we need the cache
258    ///  can be evict more effective.
259    #[expect(clippy::borrowed_box)]
260    pub async fn for_compactor(
261        store: ObjectStoreRef,
262        path: String,
263        block_cache_capacity: usize,
264        meta_cache_capacity: usize,
265        use_new_object_prefix_strategy: bool,
266    ) -> HummockResult<Self> {
267        let meta_cache = HybridCacheBuilder::new()
268            .memory(meta_cache_capacity)
269            .with_shards(1)
270            .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
271                u64::BITS as usize / 8 + value.estimate_size()
272            })
273            .storage()
274            .build()
275            .await
276            .map_err(HummockError::foyer_error)?;
277
278        let block_cache = HybridCacheBuilder::new()
279            .memory(block_cache_capacity)
280            .with_shards(1)
281            .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
282                // FIXME(MrCroxx): Calculate block weight more accurately.
283                u64::BITS as usize * 2 / 8 + value.raw().len()
284            })
285            .storage()
286            .build()
287            .await
288            .map_err(HummockError::foyer_error)?;
289
290        Ok(Self {
291            path,
292            store,
293
294            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
295            prefetch_buffer_capacity: block_cache_capacity,
296            max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
297            recent_filter: Arc::new(NoneRecentFilter::default().into()),
298            use_new_object_prefix_strategy,
299
300            meta_cache,
301            block_cache,
302            vector_meta_cache: CacheBuilder::new(1 << 10).build(),
303            vector_block_cache: CacheBuilder::new(1 << 10).build(),
304        })
305    }
306
307    pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
308        self.store
309            .delete(self.get_sst_data_path(object_id).as_str())
310            .await?;
311        self.meta_cache.remove(&object_id);
312        // TODO(MrCroxx): support group remove in foyer.
313        Ok(())
314    }
315
316    pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
317        self.meta_cache.remove(&object_id);
318        Ok(())
319    }
320
321    pub(crate) async fn put_sst_data(
322        &self,
323        object_id: HummockSstableObjectId,
324        data: Bytes,
325    ) -> HummockResult<()> {
326        let data_path = self.get_sst_data_path(object_id);
327        self.store
328            .upload(&data_path, data)
329            .await
330            .map_err(Into::into)
331    }
332
333    pub async fn prefetch_blocks(
334        &self,
335        sst: &Sstable,
336        block_index: usize,
337        end_index: usize,
338        policy: CachePolicy,
339        stats: &mut StoreLocalStatistic,
340    ) -> HummockResult<Box<dyn BlockStream>> {
341        let object_id = sst.id;
342        if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
343            let block = self.get(sst, block_index, policy, stats).await?;
344            return Ok(Box::new(PrefetchBlockStream::new(
345                VecDeque::from([block]),
346                block_index,
347                None,
348            )));
349        }
350        stats.cache_data_block_total += 1;
351        if let Some(entry) = self
352            .block_cache
353            .get(&SstableBlockIndex {
354                sst_id: object_id,
355                block_idx: block_index as _,
356            })
357            .await
358            .map_err(HummockError::foyer_error)?
359        {
360            let block = BlockHolder::from_hybrid_cache_entry(entry);
361            return Ok(Box::new(PrefetchBlockStream::new(
362                VecDeque::from([block]),
363                block_index,
364                None,
365            )));
366        }
367        let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
368        let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
369        let start_offset = sst.meta.block_metas[block_index].offset as usize;
370        let mut min_hit_index = end_index;
371        let mut hit_count = 0;
372        for idx in block_index..end_index {
373            if self.block_cache.contains(&SstableBlockIndex {
374                sst_id: object_id,
375                block_idx: idx as _,
376            }) {
377                if min_hit_index > idx && idx > block_index {
378                    min_hit_index = idx;
379                }
380                hit_count += 1;
381            }
382        }
383
384        if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
385        {
386            end_index = min_hit_index;
387        }
388        stats.cache_data_prefetch_count += 1;
389        stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
390        let end_offset = start_offset
391            + sst.meta.block_metas[block_index..end_index]
392                .iter()
393                .map(|meta| meta.len as usize)
394                .sum::<usize>();
395        let data_path = self.get_sst_data_path(object_id);
396        let memory_usage = end_offset - start_offset;
397        let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
398        let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
399        let store = self.store.clone();
400        let join_handle = tokio::spawn(async move {
401            store
402                .read(&data_path, start_offset..end_offset)
403                .instrument_await(span)
404                .await
405        });
406        let buf = match join_handle.await {
407            Ok(Ok(data)) => data,
408            Ok(Err(e)) => {
409                tracing::error!(
410                    "prefetch meet error when read {}..{} from sst-{} ({})",
411                    start_offset,
412                    end_offset,
413                    object_id,
414                    sst.meta.estimated_size,
415                );
416                return Err(e.into());
417            }
418            Err(_) => {
419                return Err(HummockError::other("cancel by other thread"));
420            }
421        };
422        let mut offset = 0;
423        let mut blocks = VecDeque::default();
424        for idx in block_index..end_index {
425            let end = offset + sst.meta.block_metas[idx].len as usize;
426            if end > buf.len() {
427                return Err(ObjectError::internal("read unexpected EOF").into());
428            }
429            // copy again to avoid holding a large data in memory.
430            let block = Block::decode_with_copy(
431                buf.slice(offset..end),
432                sst.meta.block_metas[idx].uncompressed_size as usize,
433                true,
434            )?;
435            let holder = if let CachePolicy::Fill(hint) = policy {
436                let hint = if idx == block_index { hint } else { Hint::Low };
437                let entry = self.block_cache.insert_with_properties(
438                    SstableBlockIndex {
439                        sst_id: object_id,
440                        block_idx: idx as _,
441                    },
442                    Box::new(block),
443                    HybridCacheProperties::default().with_hint(hint),
444                );
445                BlockHolder::from_hybrid_cache_entry(entry)
446            } else {
447                BlockHolder::from_owned_block(Box::new(block))
448            };
449
450            blocks.push_back(holder);
451            offset = end;
452        }
453        Ok(Box::new(PrefetchBlockStream::new(
454            blocks,
455            block_index,
456            Some(tracker),
457        )))
458    }
459
460    pub async fn get_block_response(
461        &self,
462        sst: &Sstable,
463        block_index: usize,
464        policy: CachePolicy,
465        stats: &mut StoreLocalStatistic,
466    ) -> HummockResult<BlockResponse> {
467        let object_id = sst.id;
468        let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
469        let store = self.store.clone();
470
471        stats.cache_data_block_total += 1;
472        let file_size = sst.meta.estimated_size;
473        let data_path = self.get_sst_data_path(object_id);
474
475        let disable_cache: fn() -> bool = || {
476            fail_point!("disable_block_cache", |_| true);
477            false
478        };
479
480        let policy = if disable_cache() {
481            CachePolicy::Disable
482        } else {
483            policy
484        };
485
486        let idx = SstableBlockIndex {
487            sst_id: object_id,
488            block_idx: block_index as _,
489        };
490
491        // future: fetch block if hybrid cache miss
492        let fetch_block = move || {
493            let range = range.clone();
494
495            async move {
496                let block_data = match store
497                    .read(&data_path, range.clone())
498                    .instrument_await("get_block_response".verbose())
499                    .await
500                {
501                    Ok(data) => data,
502                    Err(e) => {
503                        tracing::error!(
504                            "get_block_response meet error when read {:?} from sst-{}, total length: {}",
505                            range,
506                            object_id,
507                            file_size
508                        );
509                        return Err(foyer::Error::other(HummockError::from(e)));
510                    }
511                };
512                let block = Box::new(
513                    Block::decode(block_data, uncompressed_capacity)
514                        .map_err(foyer::Error::other)?,
515                );
516                Ok(block)
517            }
518        };
519
520        self.recent_filter
521            .extend([(object_id, usize::MAX), (object_id, block_index)]);
522
523        match policy {
524            CachePolicy::Fill(hint) => {
525                let entry = self.block_cache.fetch_with_properties(
526                    idx,
527                    HybridCacheProperties::default().with_hint(hint),
528                    fetch_block,
529                );
530                if matches!(entry.state(), FetchState::Miss) {
531                    stats.cache_data_block_miss += 1;
532                }
533                Ok(BlockResponse::Entry(entry))
534            }
535            CachePolicy::NotFill => {
536                match self
537                    .block_cache
538                    .get(&idx)
539                    .await
540                    .map_err(HummockError::foyer_error)?
541                {
542                    Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
543                        entry,
544                    ))),
545                    _ => {
546                        let block = fetch_block().await.map_err(HummockError::foyer_error)?;
547                        Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
548                    }
549                }
550            }
551            CachePolicy::Disable => {
552                let block = fetch_block().await.map_err(HummockError::foyer_error)?;
553                Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
554            }
555        }
556    }
557
558    pub async fn get(
559        &self,
560        sst: &Sstable,
561        block_index: usize,
562        policy: CachePolicy,
563        stats: &mut StoreLocalStatistic,
564    ) -> HummockResult<BlockHolder> {
565        match self
566            .get_block_response(sst, block_index, policy, stats)
567            .await
568        {
569            Ok(block_response) => block_response.wait().await,
570            Err(err) => Err(err),
571        }
572    }
573
574    pub async fn get_vector_file_meta(
575        &self,
576        vector_file: &VectorFileInfo,
577        stats: &mut VectorStoreCacheStats,
578    ) -> HummockResult<VectorFileHolder> {
579        let entry = self
580            .vector_meta_cache
581            .fetch(vector_file.object_id.as_raw(), || {
582                let store = self.store.clone();
583                let path =
584                    self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
585                let meta_offset = vector_file.meta_offset;
586                async move {
587                    let encoded_footer = store
588                        .read(&path, meta_offset..)
589                        .await
590                        .map_err(foyer::Error::other)?;
591                    let meta = VectorFileMeta::decode_footer(&encoded_footer)
592                        .map_err(foyer::Error::other)?;
593                    Ok::<_, foyer::Error>(meta.into())
594                }
595            });
596        if let FetchState::Miss = entry.state() {
597            stats.file_meta_miss += 1;
598        }
599        stats.file_meta_total += 1;
600
601        let entry = entry.await?;
602        VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
603    }
604
605    pub async fn get_vector_block(
606        &self,
607        vector_file: &VectorFileInfo,
608        block_idx: usize,
609        block_meta: &VectorBlockMeta,
610        stats: &mut VectorStoreCacheStats,
611    ) -> HummockResult<VectorBlockHolder> {
612        let entry = self
613            .vector_block_cache
614            .fetch((vector_file.object_id, block_idx), || {
615                let store = self.store.clone();
616                let path =
617                    self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
618                let start_offset = block_meta.offset;
619                let end_offset = start_offset + block_meta.block_size;
620                async move {
621                    let encoded_block = store
622                        .read(&path, start_offset..end_offset)
623                        .await
624                        .map_err(foyer::Error::other)?;
625                    let block = VectorBlock::decode(&encoded_block).map_err(foyer::Error::other)?;
626                    Ok(Box::new(block))
627                }
628            });
629        if let FetchState::Miss = entry.state() {
630            stats.file_block_miss += 1;
631        }
632        stats.file_block_total += 1;
633
634        entry.await.map_err(HummockError::foyer_error)
635    }
636
637    pub fn insert_vector_cache(
638        &self,
639        object_id: HummockVectorFileId,
640        meta: VectorFileMeta,
641        blocks: Vec<VectorBlock>,
642    ) {
643        self.vector_meta_cache
644            .insert(object_id.as_raw(), meta.into());
645        for (idx, block) in blocks.into_iter().enumerate() {
646            self.vector_block_cache
647                .insert((object_id, idx), Box::new(block));
648        }
649    }
650
651    pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
652        self.vector_meta_cache
653            .insert(object_id.as_raw(), graph.into());
654    }
655
656    pub async fn get_hnsw_graph(
657        &self,
658        graph_file: &HnswGraphFileInfo,
659        stats: &mut VectorStoreCacheStats,
660    ) -> HummockResult<HnswGraphFileHolder> {
661        let entry = self
662            .vector_meta_cache
663            .fetch(graph_file.object_id.as_raw(), || {
664                let store = self.store.clone();
665                let graph_file_path =
666                    self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
667                async move {
668                    let encoded_graph = store
669                        .read(&graph_file_path, ..)
670                        .await
671                        .map_err(foyer::Error::other)?;
672                    let graph =
673                        PbHnswGraph::decode(encoded_graph.as_ref()).map_err(foyer::Error::other)?;
674                    Ok::<_, foyer::Error>(graph.into())
675                }
676            });
677        if let FetchState::Miss = entry.state() {
678            stats.hnsw_graph_miss += 1;
679        }
680        stats.hnsw_graph_total += 1;
681
682        let entry = entry.await?;
683        HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
684    }
685
686    pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
687        self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
688    }
689
690    pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
691        let obj_prefix = self.store.get_object_prefix(
692            object_id.as_raw().inner(),
693            self.use_new_object_prefix_strategy,
694        );
695        risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
696    }
697
698    pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
699        risingwave_hummock_sdk::get_object_id_from_path(path)
700    }
701
702    pub fn store(&self) -> ObjectStoreRef {
703        self.store.clone()
704    }
705
706    #[cfg(any(test, feature = "test"))]
707    pub async fn clear_block_cache(&self) -> HummockResult<()> {
708        self.block_cache
709            .clear()
710            .await
711            .map_err(HummockError::foyer_error)
712    }
713
714    #[cfg(any(test, feature = "test"))]
715    pub async fn clear_meta_cache(&self) -> HummockResult<()> {
716        self.meta_cache
717            .clear()
718            .await
719            .map_err(HummockError::foyer_error)
720    }
721
722    pub async fn sstable_cached(
723        &self,
724        sst_obj_id: HummockSstableObjectId,
725    ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
726        self.meta_cache
727            .get(&sst_obj_id)
728            .await
729            .map_err(HummockError::foyer_error)
730    }
731
732    /// Returns `table_holder`
733    pub fn sstable(
734        &self,
735        sstable_info_ref: &SstableInfo,
736        stats: &mut StoreLocalStatistic,
737    ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
738        let object_id = sstable_info_ref.object_id;
739
740        let entry = self.meta_cache.fetch(object_id, || {
741            let store = self.store.clone();
742            let meta_path = self.get_sst_data_path(object_id);
743            let stats_ptr = stats.remote_io_time.clone();
744            let range = sstable_info_ref.meta_offset as usize..;
745            async move {
746                let now = Instant::now();
747                let buf = store
748                    .read(&meta_path, range)
749                    .await
750                    .map_err(foyer::Error::other)?;
751                let meta = SstableMeta::decode(&buf[..]).map_err(foyer::Error::other)?;
752
753                let sst = Sstable::new(object_id, meta);
754                let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
755                stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
756                Ok(Box::new(sst))
757            }
758        });
759
760        if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
761            stats.cache_meta_block_miss += 1;
762        }
763
764        stats.cache_meta_block_total += 1;
765
766        async move { entry.await.map_err(HummockError::foyer_error) }
767    }
768
769    pub async fn list_sst_object_metadata_from_object_store(
770        &self,
771        prefix: Option<String>,
772        start_after: Option<String>,
773        limit: Option<usize>,
774    ) -> HummockResult<ObjectMetadataIter> {
775        let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
776        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
777        let iter = raw_iter.filter(|r| match r {
778            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
779            Err(_) => future::ready(true),
780        });
781        Ok(Box::pin(iter))
782    }
783
784    pub fn create_sst_writer(
785        self: Arc<Self>,
786        object_id: impl Into<HummockSstableObjectId>,
787        options: SstableWriterOptions,
788    ) -> BatchUploadWriter {
789        BatchUploadWriter::new(object_id, self, options)
790    }
791
792    pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
793        let sst = Sstable::new(object_id, meta);
794        self.meta_cache.insert(object_id, Box::new(sst));
795    }
796
797    pub fn insert_block_cache(
798        &self,
799        object_id: HummockSstableObjectId,
800        block_index: u64,
801        block: Box<Block>,
802    ) {
803        self.block_cache.insert(
804            SstableBlockIndex {
805                sst_id: object_id,
806                block_idx: block_index,
807            },
808            block,
809        );
810    }
811
812    pub fn get_prefetch_memory_usage(&self) -> usize {
813        self.prefetch_buffer_usage.load(Ordering::Acquire)
814    }
815
816    pub async fn get_stream_for_blocks(
817        &self,
818        object_id: HummockSstableObjectId,
819        metas: &[BlockMeta],
820    ) -> HummockResult<BlockDataStream> {
821        fail_point!("get_stream_err");
822        let data_path = self.get_sst_data_path(object_id);
823        let store = self.store();
824        let block_meta = &metas[0];
825        let start_pos = block_meta.offset as usize;
826        let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
827        let range = start_pos..end_pos;
828        // spawn to tokio pool because the object-storage sdk may not be safe to cancel.
829        let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
830
831        let reader = match ret {
832            Ok(Ok(reader)) => reader,
833            Ok(Err(e)) => return Err(HummockError::from(e)),
834            Err(e) => {
835                return Err(HummockError::other(format!(
836                    "failed to get result, this read request may be canceled: {}",
837                    e.as_report()
838                )));
839            }
840        };
841        Ok(BlockDataStream::new(reader, metas.to_vec()))
842    }
843
844    pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
845        &self.meta_cache
846    }
847
848    pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
849        &self.block_cache
850    }
851
852    pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
853        &self.recent_filter
854    }
855
856    pub async fn create_streaming_uploader(
857        &self,
858        path: &str,
859    ) -> ObjectResult<ObjectStreamingUploader> {
860        self.store.streaming_upload(path).await
861    }
862}
863
864pub type SstableStoreRef = Arc<SstableStore>;
865#[cfg(test)]
866mod tests {
867    use std::ops::Range;
868    use std::sync::Arc;
869
870    use risingwave_hummock_sdk::HummockObjectId;
871    use risingwave_hummock_sdk::sstable_info::SstableInfo;
872
873    use super::{SstableStoreRef, SstableWriterOptions};
874    use crate::hummock::iterator::HummockIterator;
875    use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
876    use crate::hummock::sstable::SstableIteratorReadOptions;
877    use crate::hummock::test_utils::{
878        default_builder_opt_for_test, gen_test_sstable_data, put_sst,
879    };
880    use crate::hummock::value::HummockValue;
881    use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
882    use crate::monitor::StoreLocalStatistic;
883
884    const SST_ID: u64 = 1;
885
886    fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
887        HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
888    }
889
890    async fn validate_sst(
891        sstable_store: SstableStoreRef,
892        info: &SstableInfo,
893        mut meta: SstableMeta,
894        x_range: Range<usize>,
895    ) {
896        let mut stats = StoreLocalStatistic::default();
897        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
898        std::mem::take(&mut meta.bloom_filter);
899        assert_eq!(holder.meta, meta);
900        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
901        assert_eq!(holder.meta, meta);
902        let mut iter = SstableIterator::new(
903            holder,
904            sstable_store,
905            Arc::new(SstableIteratorReadOptions::default()),
906            info,
907        );
908        iter.rewind().await.unwrap();
909        for i in x_range {
910            let key = iter.key();
911            let value = iter.value();
912            assert_eq!(key, iterator_test_key_of(i).to_ref());
913            assert_eq!(value, get_hummock_value(i).as_slice());
914            iter.next().await.unwrap();
915        }
916    }
917
918    #[tokio::test]
919    async fn test_batch_upload() {
920        let sstable_store = mock_sstable_store().await;
921        let x_range = 0..100;
922        let (data, meta) = gen_test_sstable_data(
923            default_builder_opt_for_test(),
924            x_range
925                .clone()
926                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
927        )
928        .await;
929        let writer_opts = SstableWriterOptions {
930            capacity_hint: None,
931            tracker: None,
932            policy: CachePolicy::Disable,
933        };
934        let info = put_sst(
935            SST_ID,
936            data.clone(),
937            meta.clone(),
938            sstable_store.clone(),
939            writer_opts,
940            vec![0],
941        )
942        .await
943        .unwrap();
944
945        validate_sst(sstable_store, &info, meta, x_range).await;
946    }
947
948    #[tokio::test]
949    async fn test_streaming_upload() {
950        // Generate test data.
951        let sstable_store = mock_sstable_store().await;
952        let x_range = 0..100;
953        let (data, meta) = gen_test_sstable_data(
954            default_builder_opt_for_test(),
955            x_range
956                .clone()
957                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
958        )
959        .await;
960        let writer_opts = SstableWriterOptions {
961            capacity_hint: None,
962            tracker: None,
963            policy: CachePolicy::Disable,
964        };
965        let info = put_sst(
966            SST_ID,
967            data.clone(),
968            meta.clone(),
969            sstable_store.clone(),
970            writer_opts,
971            vec![0],
972        )
973        .await
974        .unwrap();
975
976        validate_sst(sstable_store, &info, meta, x_range).await;
977    }
978
979    #[tokio::test]
980    async fn test_basic() {
981        let sstable_store = mock_sstable_store().await;
982        let object_id = 123;
983        let data_path = sstable_store.get_sst_data_path(object_id);
984        assert_eq!(data_path, "test/123.data");
985        assert_eq!(
986            SstableStore::get_object_id_from_path(&data_path),
987            HummockObjectId::Sstable(object_id.into())
988        );
989    }
990}