risingwave_storage/hummock/
sstable_store.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::clone::Clone;
16use std::collections::VecDeque;
17use std::ops::Deref;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22use await_tree::{InstrumentAwait, SpanExt};
23use bytes::Bytes;
24use fail::fail_point;
25use foyer::{
26    Cache, CacheBuilder, CacheEntry, EventListener, Hint, HybridCache, HybridCacheBuilder,
27    HybridCacheEntry, HybridCacheProperties,
28};
29use futures::{FutureExt, StreamExt, future};
30use prost::Message;
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
33use risingwave_hummock_sdk::{
34    HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
35    HummockVectorFileId, SST_OBJECT_SUFFIX,
36};
37use risingwave_hummock_trace::TracedCachePolicy;
38use risingwave_object_store::object::{
39    ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
40};
41use risingwave_pb::hummock::PbHnswGraph;
42use serde::{Deserialize, Serialize};
43use thiserror_ext::AsReport;
44use tokio::time::Instant;
45
46use super::{
47    BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
48    SstableWriterOptions,
49};
50use crate::hummock::block_stream::{
51    BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
52};
53use crate::hummock::none::NoneRecentFilter;
54use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
55use crate::hummock::vector::monitor::VectorStoreCacheStats;
56use crate::hummock::{BlockEntry, BlockHolder, HummockError, HummockResult, RecentFilterTrait};
57use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
58
59macro_rules! impl_vector_index_meta_file {
60    ($($type_name:ident),+) => {
61        pub enum HummockVectorIndexMetaFile {
62            $(
63                $type_name(Pin<Box<$type_name>>),
64            )+
65        }
66
67        $(
68            impl From<$type_name> for HummockVectorIndexMetaFile {
69                fn from(v: $type_name) -> Self {
70                    Self::$type_name(Box::pin(v))
71                }
72            }
73
74            unsafe impl Send for VectorMetaFileHolder<$type_name> {}
75
76            impl VectorMetaFileHolder<$type_name> {
77                fn try_from_entry(
78                    entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
79                    object_id: HummockRawObjectId
80                ) -> HummockResult<Self> {
81                    let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
82                        return Err(HummockError::decode_error(format!(
83                            "expect {} for object {}",
84                            stringify!($type_name),
85                            object_id
86                        )));
87                    };
88                    let ptr = file_meta.as_ref().get_ref() as *const _;
89                    Ok(VectorMetaFileHolder {
90                        _cache_entry: entry,
91                        ptr,
92                    })
93                }
94            }
95        )+
96    };
97}
98
99impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
100
101pub struct VectorMetaFileHolder<T> {
102    _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
103    ptr: *const T,
104}
105
106impl<T> Deref for VectorMetaFileHolder<T> {
107    type Target = T;
108
109    fn deref(&self) -> &Self::Target {
110        // SAFETY: VectorFileHolder is exposed only as immutable, and `VectorFileMeta` is pinned via box
111        unsafe { &*self.ptr }
112    }
113}
114
115pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
116
117pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
118
119pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
120pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
121
122#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
123pub struct SstableBlockIndex {
124    pub sst_id: HummockSstableObjectId,
125    pub block_idx: u64,
126}
127
128pub struct BlockCacheEventListener {
129    metrics: Arc<HummockStateStoreMetrics>,
130}
131
132impl BlockCacheEventListener {
133    pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
134        Self { metrics }
135    }
136}
137
138impl EventListener for BlockCacheEventListener {
139    type Key = SstableBlockIndex;
140    type Value = Box<Block>;
141
142    fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
143    where
144        Self::Key: foyer::Key,
145        Self::Value: foyer::Value,
146    {
147        self.metrics
148            .block_efficiency_histogram
149            .observe(value.efficiency());
150    }
151}
152
153// TODO: Define policy based on use cases (read / compaction / ...).
154#[derive(Clone, Copy, Eq, PartialEq)]
155pub enum CachePolicy {
156    /// Disable read cache and not fill the cache afterwards.
157    Disable,
158    /// Try reading the cache and fill the cache afterwards.
159    Fill(Hint),
160    /// Read the cache but not fill the cache afterwards.
161    NotFill,
162}
163
164impl Default for CachePolicy {
165    fn default() -> Self {
166        CachePolicy::Fill(Hint::Normal)
167    }
168}
169
170impl From<TracedCachePolicy> for CachePolicy {
171    fn from(policy: TracedCachePolicy) -> Self {
172        match policy {
173            TracedCachePolicy::Disable => Self::Disable,
174            TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
175            TracedCachePolicy::NotFill => Self::NotFill,
176        }
177    }
178}
179
180impl From<CachePolicy> for TracedCachePolicy {
181    fn from(policy: CachePolicy) -> Self {
182        match policy {
183            CachePolicy::Disable => Self::Disable,
184            CachePolicy::Fill(priority) => Self::Fill(priority.into()),
185            CachePolicy::NotFill => Self::NotFill,
186        }
187    }
188}
189
190pub struct SstableStoreConfig {
191    pub store: ObjectStoreRef,
192    pub path: String,
193
194    pub prefetch_buffer_capacity: usize,
195    pub max_prefetch_block_number: usize,
196    pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
197    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
198    pub use_new_object_prefix_strategy: bool,
199    pub skip_bloom_filter_in_serde: bool,
200
201    pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
202    pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
203
204    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
205    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
206}
207
208pub struct SstableStore {
209    path: String,
210    store: ObjectStoreRef,
211
212    meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
213    block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
214    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
215    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
216
217    /// Recent filter for `(sst_obj_id, blk_idx)`.
218    ///
219    /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
220    recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
221    prefetch_buffer_usage: Arc<AtomicUsize>,
222    prefetch_buffer_capacity: usize,
223    max_prefetch_block_number: usize,
224    /// Whether the object store is divided into prefixes depends on two factors:
225    ///   1. The specific object store type.
226    ///   2. Whether the existing cluster is a new cluster.
227    ///
228    /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters.
229    /// For a new cluster, `use_new_object_prefix_strategy` is set to True.
230    /// For an old cluster, `use_new_object_prefix_strategy` is set to False.
231    /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility.
232    use_new_object_prefix_strategy: bool,
233
234    /// sst serde happens when a sst meta is written to meta disk cache.
235    /// excluding bloom filter from serde can reduce the meta disk cache entry size
236    /// and reduce the disk io throughput at the cost of making the bloom filter useless
237    skip_bloom_filter_in_serde: bool,
238}
239
240impl SstableStore {
241    pub fn new(config: SstableStoreConfig) -> Self {
242        // TODO: We should validate path early. Otherwise object store won't report invalid path
243        // error until first write attempt.
244
245        Self {
246            path: config.path,
247            store: config.store,
248
249            meta_cache: config.meta_cache,
250            block_cache: config.block_cache,
251            vector_meta_cache: config.vector_meta_cache,
252            vector_block_cache: config.vector_block_cache,
253
254            recent_filter: config.recent_filter,
255            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
256            prefetch_buffer_capacity: config.prefetch_buffer_capacity,
257            max_prefetch_block_number: config.max_prefetch_block_number,
258            use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
259            skip_bloom_filter_in_serde: config.skip_bloom_filter_in_serde,
260        }
261    }
262
263    /// For compactor, we do not need a high concurrency load for cache. Instead, we need the cache
264    ///  can be evict more effective.
265    #[expect(clippy::borrowed_box)]
266    pub async fn for_compactor(
267        store: ObjectStoreRef,
268        path: String,
269        block_cache_capacity: usize,
270        meta_cache_capacity: usize,
271        use_new_object_prefix_strategy: bool,
272    ) -> HummockResult<Self> {
273        let meta_cache = HybridCacheBuilder::new()
274            .memory(meta_cache_capacity)
275            .with_shards(1)
276            .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
277                u64::BITS as usize / 8 + value.estimate_size()
278            })
279            .storage()
280            .build()
281            .await
282            .map_err(HummockError::foyer_error)?;
283
284        let block_cache = HybridCacheBuilder::new()
285            .memory(block_cache_capacity)
286            .with_shards(1)
287            .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
288                // FIXME(MrCroxx): Calculate block weight more accurately.
289                u64::BITS as usize * 2 / 8 + value.raw().len()
290            })
291            .storage()
292            .build()
293            .await
294            .map_err(HummockError::foyer_error)?;
295
296        Ok(Self {
297            path,
298            store,
299
300            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
301            prefetch_buffer_capacity: block_cache_capacity,
302            max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
303            recent_filter: Arc::new(NoneRecentFilter::default().into()),
304            use_new_object_prefix_strategy,
305            skip_bloom_filter_in_serde: false,
306
307            meta_cache,
308            block_cache,
309            vector_meta_cache: CacheBuilder::new(1 << 10).build(),
310            vector_block_cache: CacheBuilder::new(1 << 10).build(),
311        })
312    }
313
314    pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
315        self.store
316            .delete(self.get_sst_data_path(object_id).as_str())
317            .await?;
318        self.meta_cache.remove(&object_id);
319        // TODO(MrCroxx): support group remove in foyer.
320        Ok(())
321    }
322
323    pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
324        self.meta_cache.remove(&object_id);
325        Ok(())
326    }
327
328    pub(crate) async fn put_sst_data(
329        &self,
330        object_id: HummockSstableObjectId,
331        data: Bytes,
332    ) -> HummockResult<()> {
333        let data_path = self.get_sst_data_path(object_id);
334        self.store
335            .upload(&data_path, data)
336            .await
337            .map_err(Into::into)
338    }
339
340    pub async fn prefetch_blocks(
341        &self,
342        sst: &Sstable,
343        block_index: usize,
344        end_index: usize,
345        policy: CachePolicy,
346        stats: &mut StoreLocalStatistic,
347    ) -> HummockResult<Box<dyn BlockStream>> {
348        let object_id = sst.id;
349        if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
350            let block = self.get(sst, block_index, policy, stats).await?;
351            return Ok(Box::new(PrefetchBlockStream::new(
352                VecDeque::from([block]),
353                block_index,
354                None,
355            )));
356        }
357        if let Some(entry) = self
358            .block_cache
359            .get(&SstableBlockIndex {
360                sst_id: object_id,
361                block_idx: block_index as _,
362            })
363            .await
364            .map_err(HummockError::foyer_error)?
365        {
366            stats.cache_data_block_total += 1;
367            if entry.source() == foyer::Source::Outer {
368                stats.cache_data_block_miss += 1;
369            }
370            let block = BlockHolder::from_hybrid_cache_entry(entry);
371            return Ok(Box::new(PrefetchBlockStream::new(
372                VecDeque::from([block]),
373                block_index,
374                None,
375            )));
376        }
377        let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
378        let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
379        let start_offset = sst.meta.block_metas[block_index].offset as usize;
380        let mut min_hit_index = end_index;
381        let mut hit_count = 0;
382        for idx in block_index..end_index {
383            if self.block_cache.contains(&SstableBlockIndex {
384                sst_id: object_id,
385                block_idx: idx as _,
386            }) {
387                if min_hit_index > idx && idx > block_index {
388                    min_hit_index = idx;
389                }
390                hit_count += 1;
391            }
392        }
393
394        if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
395        {
396            end_index = min_hit_index;
397        }
398        stats.cache_data_prefetch_count += 1;
399        stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
400        let end_offset = start_offset
401            + sst.meta.block_metas[block_index..end_index]
402                .iter()
403                .map(|meta| meta.len as usize)
404                .sum::<usize>();
405        let data_path = self.get_sst_data_path(object_id);
406        let memory_usage = end_offset - start_offset;
407        let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
408        let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
409        let store = self.store.clone();
410        let join_handle = tokio::spawn(async move {
411            store
412                .read(&data_path, start_offset..end_offset)
413                .instrument_await(span)
414                .await
415        });
416        let buf = match join_handle.await {
417            Ok(Ok(data)) => data,
418            Ok(Err(e)) => {
419                tracing::error!(
420                    "prefetch meet error when read {}..{} from sst-{} ({})",
421                    start_offset,
422                    end_offset,
423                    object_id,
424                    sst.meta.estimated_size,
425                );
426                return Err(e.into());
427            }
428            Err(_) => {
429                return Err(HummockError::other("cancel by other thread"));
430            }
431        };
432        let mut offset = 0;
433        let mut blocks = VecDeque::default();
434        for idx in block_index..end_index {
435            let end = offset + sst.meta.block_metas[idx].len as usize;
436            if end > buf.len() {
437                return Err(ObjectError::internal("read unexpected EOF").into());
438            }
439            // copy again to avoid holding a large data in memory.
440            let block = Block::decode_with_copy(
441                buf.slice(offset..end),
442                sst.meta.block_metas[idx].uncompressed_size as usize,
443                true,
444            )?;
445            let holder = if let CachePolicy::Fill(hint) = policy {
446                let hint = if idx == block_index { hint } else { Hint::Low };
447                let entry = self.block_cache.insert_with_properties(
448                    SstableBlockIndex {
449                        sst_id: object_id,
450                        block_idx: idx as _,
451                    },
452                    Box::new(block),
453                    HybridCacheProperties::default().with_hint(hint),
454                );
455                BlockHolder::from_hybrid_cache_entry(entry)
456            } else {
457                BlockHolder::from_owned_block(Box::new(block))
458            };
459
460            blocks.push_back(holder);
461            offset = end;
462        }
463        Ok(Box::new(PrefetchBlockStream::new(
464            blocks,
465            block_index,
466            Some(tracker),
467        )))
468    }
469
470    pub async fn get_block_response(
471        &self,
472        sst: &Sstable,
473        block_index: usize,
474        policy: CachePolicy,
475    ) -> HummockResult<BlockResponse> {
476        let object_id = sst.id;
477        let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
478        let store = self.store.clone();
479
480        let file_size = sst.meta.estimated_size;
481        let data_path = Arc::new(self.get_sst_data_path(object_id));
482
483        let disable_cache: fn() -> bool = || {
484            fail_point!("disable_block_cache", |_| true);
485            false
486        };
487
488        let policy = if disable_cache() {
489            CachePolicy::Disable
490        } else {
491            policy
492        };
493
494        let idx = SstableBlockIndex {
495            sst_id: object_id,
496            block_idx: block_index as _,
497        };
498
499        self.recent_filter
500            .extend([(object_id, usize::MAX), (object_id, block_index)]);
501
502        // future: fetch block if hybrid cache miss
503        let fetch_block = async move {
504            let block_data = match store
505                .read(&data_path, range.clone())
506                .instrument_await("get_block_response".verbose())
507                .await
508            {
509                Ok(data) => data,
510                Err(e) => {
511                    tracing::error!(
512                        "get_block_response meet error when read {:?} from sst-{}, total length: {}",
513                        range,
514                        object_id,
515                        file_size
516                    );
517                    return Err(HummockError::from(e));
518                }
519            };
520            let block = Box::new(Block::decode(block_data, uncompressed_capacity)?);
521            Ok(block)
522        };
523
524        match policy {
525            CachePolicy::Fill(hint) => {
526                let properties = HybridCacheProperties::default().with_hint(hint);
527                let fetch = self.block_cache.get_or_fetch(&idx, || {
528                    fetch_block.map(|res| res.map(|block| (block, properties)))
529                });
530                Ok(BlockResponse::Fetch(fetch))
531            }
532            CachePolicy::NotFill => {
533                match self
534                    .block_cache
535                    .get(&idx)
536                    .await
537                    .map_err(HummockError::foyer_error)?
538                {
539                    Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
540                        entry,
541                    ))),
542                    _ => {
543                        let block = fetch_block.await?;
544                        Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
545                    }
546                }
547            }
548            CachePolicy::Disable => {
549                let block = fetch_block.await?;
550                Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
551            }
552        }
553    }
554
555    pub async fn get(
556        &self,
557        sst: &Sstable,
558        block_index: usize,
559        policy: CachePolicy,
560        stats: &mut StoreLocalStatistic,
561    ) -> HummockResult<BlockHolder> {
562        let block_response = self.get_block_response(sst, block_index, policy).await?;
563        let block_holder = block_response.wait().await?;
564        stats.cache_data_block_total += 1;
565        if let BlockEntry::HybridCache(entry) = block_holder.entry()
566            && entry.source() == foyer::Source::Outer
567        {
568            stats.cache_data_block_miss += 1;
569        }
570        Ok(block_holder)
571    }
572
573    pub async fn get_vector_file_meta(
574        &self,
575        vector_file: &VectorFileInfo,
576        stats: &mut VectorStoreCacheStats,
577    ) -> HummockResult<VectorFileHolder> {
578        let store = self.store.clone();
579        let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
580        let meta_offset = vector_file.meta_offset;
581        let entry = self
582            .vector_meta_cache
583            .get_or_fetch(&vector_file.object_id.as_raw(), || async move {
584                let encoded_footer = store.read(&path, meta_offset..).await?;
585                let meta = VectorFileMeta::decode_footer(&encoded_footer)?;
586                Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(meta))
587            })
588            .await?;
589        stats.file_meta_total += 1;
590        if entry.source() == foyer::Source::Outer {
591            stats.file_meta_miss += 1;
592        }
593        VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
594    }
595
596    pub async fn get_vector_block(
597        &self,
598        vector_file: &VectorFileInfo,
599        block_idx: usize,
600        block_meta: &VectorBlockMeta,
601        stats: &mut VectorStoreCacheStats,
602    ) -> HummockResult<VectorBlockHolder> {
603        let store = self.store.clone();
604        let path = self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
605        let start_offset = block_meta.offset;
606        let end_offset = start_offset + block_meta.block_size;
607        let entry = self
608            .vector_block_cache
609            .get_or_fetch(&(vector_file.object_id, block_idx), || async move {
610                let encoded_block = store.read(&path, start_offset..end_offset).await?;
611                let block = VectorBlock::decode(&encoded_block)?;
612                Ok::<_, anyhow::Error>(Box::new(block))
613            })
614            .await
615            .map_err(HummockError::foyer_error)?;
616
617        stats.file_block_total += 1;
618        if entry.source() == foyer::Source::Outer {
619            stats.file_block_miss += 1;
620        }
621        Ok(entry)
622    }
623
624    pub fn insert_vector_cache(
625        &self,
626        object_id: HummockVectorFileId,
627        meta: VectorFileMeta,
628        blocks: Vec<VectorBlock>,
629    ) {
630        self.vector_meta_cache
631            .insert(object_id.as_raw(), meta.into());
632        for (idx, block) in blocks.into_iter().enumerate() {
633            self.vector_block_cache
634                .insert((object_id, idx), Box::new(block));
635        }
636    }
637
638    pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
639        self.vector_meta_cache
640            .insert(object_id.as_raw(), graph.into());
641    }
642
643    pub async fn get_hnsw_graph(
644        &self,
645        graph_file: &HnswGraphFileInfo,
646        stats: &mut VectorStoreCacheStats,
647    ) -> HummockResult<HnswGraphFileHolder> {
648        let store = self.store.clone();
649        let graph_file_path =
650            self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
651        let entry = self
652            .vector_meta_cache
653            .get_or_fetch(&graph_file.object_id.as_raw(), || async move {
654                let encoded_graph = store.read(&graph_file_path, ..).await?;
655                let graph = PbHnswGraph::decode(encoded_graph.as_ref())?;
656                Ok::<_, anyhow::Error>(HummockVectorIndexMetaFile::from(graph))
657            })
658            .await
659            .map_err(HummockError::foyer_error)?;
660        stats.hnsw_graph_total += 1;
661        if entry.source() == foyer::Source::Outer {
662            stats.hnsw_graph_miss += 1;
663        }
664        HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
665    }
666
667    pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
668        self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
669    }
670
671    pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
672        let obj_prefix = self.store.get_object_prefix(
673            object_id.as_raw().inner(),
674            self.use_new_object_prefix_strategy,
675        );
676        risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
677    }
678
679    pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
680        risingwave_hummock_sdk::get_object_id_from_path(path)
681    }
682
683    pub fn store(&self) -> ObjectStoreRef {
684        self.store.clone()
685    }
686
687    #[cfg(any(test, feature = "test"))]
688    pub async fn clear_block_cache(&self) -> HummockResult<()> {
689        self.block_cache
690            .clear()
691            .await
692            .map_err(HummockError::foyer_error)
693    }
694
695    #[cfg(any(test, feature = "test"))]
696    pub async fn clear_meta_cache(&self) -> HummockResult<()> {
697        self.meta_cache
698            .clear()
699            .await
700            .map_err(HummockError::foyer_error)
701    }
702
703    pub async fn sstable_cached(
704        &self,
705        sst_obj_id: HummockSstableObjectId,
706    ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
707        self.meta_cache
708            .get(&sst_obj_id)
709            .await
710            .map_err(HummockError::foyer_error)
711    }
712
713    /// Returns `table_holder`
714    pub async fn sstable(
715        &self,
716        sstable_info_ref: &SstableInfo,
717        stats: &mut StoreLocalStatistic,
718    ) -> HummockResult<TableHolder> {
719        let object_id = sstable_info_ref.object_id;
720        let store = self.store.clone();
721        let meta_path = self.get_sst_data_path(object_id);
722        let stats_ptr = stats.remote_io_time.clone();
723        let range = sstable_info_ref.meta_offset as usize..;
724        let skip_bloom_filter_in_serde = self.skip_bloom_filter_in_serde;
725
726        let fetch = self.meta_cache.get_or_fetch(&object_id, || async move {
727            let now = Instant::now();
728            let buf = store
729                .read(&meta_path, range)
730                .instrument_await("get_meta_response".verbose())
731                .await?;
732            let meta = SstableMeta::decode(&buf[..])?;
733
734            let sst = Sstable::new(object_id, meta, skip_bloom_filter_in_serde);
735            let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
736            stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
737            Ok::<_, anyhow::Error>(Box::new(sst))
738        });
739
740        stats.cache_meta_block_total += 1;
741        let entry = fetch
742            .instrument_await("fetch_meta".verbose())
743            .await
744            .map_err(HummockError::foyer_error);
745        if let Ok(ref entry) = entry
746            && entry.source() == foyer::Source::Outer
747        {
748            stats.cache_meta_block_miss += 1;
749        }
750        entry
751    }
752
753    pub async fn list_sst_object_metadata_from_object_store(
754        &self,
755        prefix: Option<String>,
756        start_after: Option<String>,
757        limit: Option<usize>,
758    ) -> HummockResult<ObjectMetadataIter> {
759        let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
760        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
761        let iter = raw_iter.filter(|r| match r {
762            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
763            Err(_) => future::ready(true),
764        });
765        Ok(Box::pin(iter))
766    }
767
768    pub fn create_sst_writer(
769        self: Arc<Self>,
770        object_id: impl Into<HummockSstableObjectId>,
771        options: SstableWriterOptions,
772    ) -> BatchUploadWriter {
773        BatchUploadWriter::new(object_id, self, options)
774    }
775
776    pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
777        let sst = Sstable::new(object_id, meta, self.skip_bloom_filter_in_serde);
778        self.meta_cache.insert(object_id, Box::new(sst));
779    }
780
781    pub fn insert_block_cache(
782        &self,
783        object_id: HummockSstableObjectId,
784        block_index: u64,
785        block: Box<Block>,
786    ) {
787        self.block_cache.insert(
788            SstableBlockIndex {
789                sst_id: object_id,
790                block_idx: block_index,
791            },
792            block,
793        );
794    }
795
796    pub fn get_prefetch_memory_usage(&self) -> usize {
797        self.prefetch_buffer_usage.load(Ordering::Acquire)
798    }
799
800    pub async fn get_stream_for_blocks(
801        &self,
802        object_id: HummockSstableObjectId,
803        metas: &[BlockMeta],
804    ) -> HummockResult<BlockDataStream> {
805        fail_point!("get_stream_err");
806        let data_path = self.get_sst_data_path(object_id);
807        let store = self.store();
808        let block_meta = &metas[0];
809        let start_pos = block_meta.offset as usize;
810        let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
811        let range = start_pos..end_pos;
812        // spawn to tokio pool because the object-storage sdk may not be safe to cancel.
813        let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
814
815        let reader = match ret {
816            Ok(Ok(reader)) => reader,
817            Ok(Err(e)) => return Err(HummockError::from(e)),
818            Err(e) => {
819                return Err(HummockError::other(format!(
820                    "failed to get result, this read request may be canceled: {}",
821                    e.as_report()
822                )));
823            }
824        };
825        Ok(BlockDataStream::new(reader, metas.to_vec()))
826    }
827
828    pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
829        &self.meta_cache
830    }
831
832    pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
833        &self.block_cache
834    }
835
836    pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
837        &self.recent_filter
838    }
839
840    pub async fn create_streaming_uploader(
841        &self,
842        path: &str,
843    ) -> ObjectResult<ObjectStreamingUploader> {
844        self.store.streaming_upload(path).await
845    }
846}
847
848pub type SstableStoreRef = Arc<SstableStore>;
849#[cfg(test)]
850mod tests {
851    use std::ops::Range;
852    use std::sync::Arc;
853
854    use risingwave_hummock_sdk::HummockObjectId;
855    use risingwave_hummock_sdk::sstable_info::SstableInfo;
856
857    use super::{SstableStoreRef, SstableWriterOptions};
858    use crate::hummock::iterator::HummockIterator;
859    use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
860    use crate::hummock::sstable::SstableIteratorReadOptions;
861    use crate::hummock::test_utils::{
862        default_builder_opt_for_test, gen_test_sstable_data, put_sst,
863    };
864    use crate::hummock::value::HummockValue;
865    use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
866    use crate::monitor::StoreLocalStatistic;
867
868    const SST_ID: u64 = 1;
869
870    fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
871        HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
872    }
873
874    async fn validate_sst(
875        sstable_store: SstableStoreRef,
876        info: &SstableInfo,
877        mut meta: SstableMeta,
878        x_range: Range<usize>,
879    ) {
880        let mut stats = StoreLocalStatistic::default();
881        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
882        std::mem::take(&mut meta.bloom_filter);
883        assert_eq!(holder.meta, meta);
884        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
885        assert_eq!(holder.meta, meta);
886        let mut iter = SstableIterator::new(
887            holder,
888            sstable_store,
889            Arc::new(SstableIteratorReadOptions::default()),
890            info,
891        );
892        iter.rewind().await.unwrap();
893        for i in x_range {
894            let key = iter.key();
895            let value = iter.value();
896            assert_eq!(key, iterator_test_key_of(i).to_ref());
897            assert_eq!(value, get_hummock_value(i).as_slice());
898            iter.next().await.unwrap();
899        }
900    }
901
902    #[tokio::test]
903    async fn test_batch_upload() {
904        let sstable_store = mock_sstable_store().await;
905        let x_range = 0..100;
906        let (data, meta) = gen_test_sstable_data(
907            default_builder_opt_for_test(),
908            x_range
909                .clone()
910                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
911        )
912        .await;
913        let writer_opts = SstableWriterOptions {
914            capacity_hint: None,
915            tracker: None,
916            policy: CachePolicy::Disable,
917        };
918        let info = put_sst(
919            SST_ID,
920            data.clone(),
921            meta.clone(),
922            sstable_store.clone(),
923            writer_opts,
924            vec![0],
925        )
926        .await
927        .unwrap();
928
929        validate_sst(sstable_store, &info, meta, x_range).await;
930    }
931
932    #[tokio::test]
933    async fn test_streaming_upload() {
934        // Generate test data.
935        let sstable_store = mock_sstable_store().await;
936        let x_range = 0..100;
937        let (data, meta) = gen_test_sstable_data(
938            default_builder_opt_for_test(),
939            x_range
940                .clone()
941                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
942        )
943        .await;
944        let writer_opts = SstableWriterOptions {
945            capacity_hint: None,
946            tracker: None,
947            policy: CachePolicy::Disable,
948        };
949        let info = put_sst(
950            SST_ID,
951            data.clone(),
952            meta.clone(),
953            sstable_store.clone(),
954            writer_opts,
955            vec![0],
956        )
957        .await
958        .unwrap();
959
960        validate_sst(sstable_store, &info, meta, x_range).await;
961    }
962
963    #[tokio::test]
964    async fn test_basic() {
965        let sstable_store = mock_sstable_store().await;
966        let object_id = 123;
967        let data_path = sstable_store.get_sst_data_path(object_id);
968        assert_eq!(data_path, "test/123.data");
969        assert_eq!(
970            SstableStore::get_object_id_from_path(&data_path),
971            HummockObjectId::Sstable(object_id.into())
972        );
973    }
974}