risingwave_storage/hummock/
sstable_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::clone::Clone;
16use std::collections::VecDeque;
17use std::future::Future;
18use std::ops::Deref;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22
23use await_tree::{InstrumentAwait, SpanExt};
24use bytes::Bytes;
25use fail::fail_point;
26use foyer::{
27    Cache, CacheBuilder, CacheEntry, EventListener, FetchState, Hint, HybridCache,
28    HybridCacheBuilder, HybridCacheEntry, HybridCacheProperties,
29};
30use futures::{StreamExt, future};
31use prost::Message;
32use risingwave_hummock_sdk::sstable_info::SstableInfo;
33use risingwave_hummock_sdk::vector_index::{HnswGraphFileInfo, VectorFileInfo};
34use risingwave_hummock_sdk::{
35    HummockHnswGraphFileId, HummockObjectId, HummockRawObjectId, HummockSstableObjectId,
36    HummockVectorFileId, SST_OBJECT_SUFFIX,
37};
38use risingwave_hummock_trace::TracedCachePolicy;
39use risingwave_object_store::object::{
40    ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
41};
42use risingwave_pb::hummock::PbHnswGraph;
43use serde::{Deserialize, Serialize};
44use thiserror_ext::AsReport;
45use tokio::time::Instant;
46
47use super::{
48    BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
49    SstableWriterOptions,
50};
51use crate::hummock::block_stream::{
52    BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
53};
54use crate::hummock::none::NoneRecentFilter;
55use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
56use crate::hummock::{BlockHolder, HummockError, HummockResult, RecentFilterTrait};
57use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
58
59macro_rules! impl_vector_index_meta_file {
60    ($($type_name:ident),+) => {
61        pub enum HummockVectorIndexMetaFile {
62            $(
63                $type_name(Pin<Box<$type_name>>),
64            )+
65        }
66
67        $(
68            impl From<$type_name> for HummockVectorIndexMetaFile {
69                fn from(v: $type_name) -> Self {
70                    Self::$type_name(Box::pin(v))
71                }
72            }
73
74            unsafe impl Send for VectorMetaFileHolder<$type_name> {}
75
76            impl VectorMetaFileHolder<$type_name> {
77                fn try_from_entry(
78                    entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
79                    object_id: HummockRawObjectId
80                ) -> HummockResult<Self> {
81                    let HummockVectorIndexMetaFile::$type_name(file_meta) = &*entry else {
82                        return Err(HummockError::decode_error(format!(
83                            "expect {} for object {}",
84                            stringify!($type_name),
85                            object_id
86                        )));
87                    };
88                    let ptr = file_meta.as_ref().get_ref() as *const _;
89                    Ok(VectorMetaFileHolder {
90                        _cache_entry: entry,
91                        ptr,
92                    })
93                }
94            }
95        )+
96    };
97}
98
99impl_vector_index_meta_file!(VectorFileMeta, PbHnswGraph);
100
101pub struct VectorMetaFileHolder<T> {
102    _cache_entry: CacheEntry<HummockRawObjectId, HummockVectorIndexMetaFile>,
103    ptr: *const T,
104}
105
106impl<T> Deref for VectorMetaFileHolder<T> {
107    type Target = T;
108
109    fn deref(&self) -> &Self::Target {
110        // #safety: VectorFileHolder is exposed only as immutable, and `VectorFileMeta` is pinned via box
111        unsafe { &*self.ptr }
112    }
113}
114
115pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
116
117pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
118
119pub type VectorFileHolder = VectorMetaFileHolder<VectorFileMeta>;
120pub type HnswGraphFileHolder = VectorMetaFileHolder<PbHnswGraph>;
121
122#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
123pub struct SstableBlockIndex {
124    pub sst_id: HummockSstableObjectId,
125    pub block_idx: u64,
126}
127
128pub struct BlockCacheEventListener {
129    metrics: Arc<HummockStateStoreMetrics>,
130}
131
132impl BlockCacheEventListener {
133    pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
134        Self { metrics }
135    }
136}
137
138impl EventListener for BlockCacheEventListener {
139    type Key = SstableBlockIndex;
140    type Value = Box<Block>;
141
142    fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
143    where
144        Self::Key: foyer::Key,
145        Self::Value: foyer::Value,
146    {
147        self.metrics
148            .block_efficiency_histogram
149            .observe(value.efficiency());
150    }
151}
152
153// TODO: Define policy based on use cases (read / compaction / ...).
154#[derive(Clone, Copy, Eq, PartialEq)]
155pub enum CachePolicy {
156    /// Disable read cache and not fill the cache afterwards.
157    Disable,
158    /// Try reading the cache and fill the cache afterwards.
159    Fill(Hint),
160    /// Read the cache but not fill the cache afterwards.
161    NotFill,
162}
163
164impl Default for CachePolicy {
165    fn default() -> Self {
166        CachePolicy::Fill(Hint::Normal)
167    }
168}
169
170impl From<TracedCachePolicy> for CachePolicy {
171    fn from(policy: TracedCachePolicy) -> Self {
172        match policy {
173            TracedCachePolicy::Disable => Self::Disable,
174            TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
175            TracedCachePolicy::NotFill => Self::NotFill,
176        }
177    }
178}
179
180impl From<CachePolicy> for TracedCachePolicy {
181    fn from(policy: CachePolicy) -> Self {
182        match policy {
183            CachePolicy::Disable => Self::Disable,
184            CachePolicy::Fill(priority) => Self::Fill(priority.into()),
185            CachePolicy::NotFill => Self::NotFill,
186        }
187    }
188}
189
190pub struct SstableStoreConfig {
191    pub store: ObjectStoreRef,
192    pub path: String,
193
194    pub prefetch_buffer_capacity: usize,
195    pub max_prefetch_block_number: usize,
196    pub recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
197    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
198    pub use_new_object_prefix_strategy: bool,
199
200    pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
201    pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
202
203    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
204    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
205}
206
207pub struct SstableStore {
208    path: String,
209    store: ObjectStoreRef,
210
211    meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
212    block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
213    pub vector_meta_cache: Cache<HummockRawObjectId, HummockVectorIndexMetaFile>,
214    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
215
216    /// Recent filter for `(sst_obj_id, blk_idx)`.
217    ///
218    /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
219    recent_filter: Arc<RecentFilter<(HummockSstableObjectId, usize)>>,
220    prefetch_buffer_usage: Arc<AtomicUsize>,
221    prefetch_buffer_capacity: usize,
222    max_prefetch_block_number: usize,
223    /// Whether the object store is divided into prefixes depends on two factors:
224    ///   1. The specific object store type.
225    ///   2. Whether the existing cluster is a new cluster.
226    ///
227    /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters.
228    /// For a new cluster, `use_new_object_prefix_strategy` is set to True.
229    /// For an old cluster, `use_new_object_prefix_strategy` is set to False.
230    /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility.
231    use_new_object_prefix_strategy: bool,
232}
233
234impl SstableStore {
235    pub fn new(config: SstableStoreConfig) -> Self {
236        // TODO: We should validate path early. Otherwise object store won't report invalid path
237        // error until first write attempt.
238
239        Self {
240            path: config.path,
241            store: config.store,
242
243            meta_cache: config.meta_cache,
244            block_cache: config.block_cache,
245            vector_meta_cache: config.vector_meta_cache,
246            vector_block_cache: config.vector_block_cache,
247
248            recent_filter: config.recent_filter,
249            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
250            prefetch_buffer_capacity: config.prefetch_buffer_capacity,
251            max_prefetch_block_number: config.max_prefetch_block_number,
252            use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
253        }
254    }
255
256    /// For compactor, we do not need a high concurrency load for cache. Instead, we need the cache
257    ///  can be evict more effective.
258    #[expect(clippy::borrowed_box)]
259    pub async fn for_compactor(
260        store: ObjectStoreRef,
261        path: String,
262        block_cache_capacity: usize,
263        meta_cache_capacity: usize,
264        use_new_object_prefix_strategy: bool,
265    ) -> HummockResult<Self> {
266        let meta_cache = HybridCacheBuilder::new()
267            .memory(meta_cache_capacity)
268            .with_shards(1)
269            .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
270                u64::BITS as usize / 8 + value.estimate_size()
271            })
272            .storage()
273            .build()
274            .await
275            .map_err(HummockError::foyer_error)?;
276
277        let block_cache = HybridCacheBuilder::new()
278            .memory(block_cache_capacity)
279            .with_shards(1)
280            .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
281                // FIXME(MrCroxx): Calculate block weight more accurately.
282                u64::BITS as usize * 2 / 8 + value.raw().len()
283            })
284            .storage()
285            .build()
286            .await
287            .map_err(HummockError::foyer_error)?;
288
289        Ok(Self {
290            path,
291            store,
292
293            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
294            prefetch_buffer_capacity: block_cache_capacity,
295            max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
296            recent_filter: Arc::new(NoneRecentFilter::default().into()),
297            use_new_object_prefix_strategy,
298
299            meta_cache,
300            block_cache,
301            vector_meta_cache: CacheBuilder::new(1 << 10).build(),
302            vector_block_cache: CacheBuilder::new(1 << 10).build(),
303        })
304    }
305
306    pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
307        self.store
308            .delete(self.get_sst_data_path(object_id).as_str())
309            .await?;
310        self.meta_cache.remove(&object_id);
311        // TODO(MrCroxx): support group remove in foyer.
312        Ok(())
313    }
314
315    pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
316        self.meta_cache.remove(&object_id);
317        Ok(())
318    }
319
320    pub(crate) async fn put_sst_data(
321        &self,
322        object_id: HummockSstableObjectId,
323        data: Bytes,
324    ) -> HummockResult<()> {
325        let data_path = self.get_sst_data_path(object_id);
326        self.store
327            .upload(&data_path, data)
328            .await
329            .map_err(Into::into)
330    }
331
332    pub async fn prefetch_blocks(
333        &self,
334        sst: &Sstable,
335        block_index: usize,
336        end_index: usize,
337        policy: CachePolicy,
338        stats: &mut StoreLocalStatistic,
339    ) -> HummockResult<Box<dyn BlockStream>> {
340        let object_id = sst.id;
341        if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
342            let block = self.get(sst, block_index, policy, stats).await?;
343            return Ok(Box::new(PrefetchBlockStream::new(
344                VecDeque::from([block]),
345                block_index,
346                None,
347            )));
348        }
349        stats.cache_data_block_total += 1;
350        if let Some(entry) = self
351            .block_cache
352            .get(&SstableBlockIndex {
353                sst_id: object_id,
354                block_idx: block_index as _,
355            })
356            .await
357            .map_err(HummockError::foyer_error)?
358        {
359            let block = BlockHolder::from_hybrid_cache_entry(entry);
360            return Ok(Box::new(PrefetchBlockStream::new(
361                VecDeque::from([block]),
362                block_index,
363                None,
364            )));
365        }
366        let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
367        let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
368        let start_offset = sst.meta.block_metas[block_index].offset as usize;
369        let mut min_hit_index = end_index;
370        let mut hit_count = 0;
371        for idx in block_index..end_index {
372            if self.block_cache.contains(&SstableBlockIndex {
373                sst_id: object_id,
374                block_idx: idx as _,
375            }) {
376                if min_hit_index > idx && idx > block_index {
377                    min_hit_index = idx;
378                }
379                hit_count += 1;
380            }
381        }
382
383        if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
384        {
385            end_index = min_hit_index;
386        }
387        stats.cache_data_prefetch_count += 1;
388        stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
389        let end_offset = start_offset
390            + sst.meta.block_metas[block_index..end_index]
391                .iter()
392                .map(|meta| meta.len as usize)
393                .sum::<usize>();
394        let data_path = self.get_sst_data_path(object_id);
395        let memory_usage = end_offset - start_offset;
396        let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
397        let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
398        let store = self.store.clone();
399        let join_handle = tokio::spawn(async move {
400            store
401                .read(&data_path, start_offset..end_offset)
402                .instrument_await(span)
403                .await
404        });
405        let buf = match join_handle.await {
406            Ok(Ok(data)) => data,
407            Ok(Err(e)) => {
408                tracing::error!(
409                    "prefetch meet error when read {}..{} from sst-{} ({})",
410                    start_offset,
411                    end_offset,
412                    object_id,
413                    sst.meta.estimated_size,
414                );
415                return Err(e.into());
416            }
417            Err(_) => {
418                return Err(HummockError::other("cancel by other thread"));
419            }
420        };
421        let mut offset = 0;
422        let mut blocks = VecDeque::default();
423        for idx in block_index..end_index {
424            let end = offset + sst.meta.block_metas[idx].len as usize;
425            if end > buf.len() {
426                return Err(ObjectError::internal("read unexpected EOF").into());
427            }
428            // copy again to avoid holding a large data in memory.
429            let block = Block::decode_with_copy(
430                buf.slice(offset..end),
431                sst.meta.block_metas[idx].uncompressed_size as usize,
432                true,
433            )?;
434            let holder = if let CachePolicy::Fill(hint) = policy {
435                let hint = if idx == block_index { hint } else { Hint::Low };
436                let entry = self.block_cache.insert_with_properties(
437                    SstableBlockIndex {
438                        sst_id: object_id,
439                        block_idx: idx as _,
440                    },
441                    Box::new(block),
442                    HybridCacheProperties::default().with_hint(hint),
443                );
444                BlockHolder::from_hybrid_cache_entry(entry)
445            } else {
446                BlockHolder::from_owned_block(Box::new(block))
447            };
448
449            blocks.push_back(holder);
450            offset = end;
451        }
452        Ok(Box::new(PrefetchBlockStream::new(
453            blocks,
454            block_index,
455            Some(tracker),
456        )))
457    }
458
459    pub async fn get_block_response(
460        &self,
461        sst: &Sstable,
462        block_index: usize,
463        policy: CachePolicy,
464        stats: &mut StoreLocalStatistic,
465    ) -> HummockResult<BlockResponse> {
466        let object_id = sst.id;
467        let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
468        let store = self.store.clone();
469
470        stats.cache_data_block_total += 1;
471        let file_size = sst.meta.estimated_size;
472        let data_path = self.get_sst_data_path(object_id);
473
474        let disable_cache: fn() -> bool = || {
475            fail_point!("disable_block_cache", |_| true);
476            false
477        };
478
479        let policy = if disable_cache() {
480            CachePolicy::Disable
481        } else {
482            policy
483        };
484
485        let idx = SstableBlockIndex {
486            sst_id: object_id,
487            block_idx: block_index as _,
488        };
489
490        // future: fetch block if hybrid cache miss
491        let fetch_block = move || {
492            let range = range.clone();
493
494            async move {
495                let block_data = match store
496                    .read(&data_path, range.clone())
497                    .instrument_await("get_block_response".verbose())
498                    .await
499                {
500                    Ok(data) => data,
501                    Err(e) => {
502                        tracing::error!(
503                            "get_block_response meet error when read {:?} from sst-{}, total length: {}",
504                            range,
505                            object_id,
506                            file_size
507                        );
508                        return Err(foyer::Error::other(HummockError::from(e)));
509                    }
510                };
511                let block = Box::new(
512                    Block::decode(block_data, uncompressed_capacity)
513                        .map_err(foyer::Error::other)?,
514                );
515                Ok(block)
516            }
517        };
518
519        self.recent_filter
520            .extend([(object_id, usize::MAX), (object_id, block_index)]);
521
522        match policy {
523            CachePolicy::Fill(hint) => {
524                let entry = self.block_cache.fetch_with_properties(
525                    idx,
526                    HybridCacheProperties::default().with_hint(hint),
527                    fetch_block,
528                );
529                if matches!(entry.state(), FetchState::Miss) {
530                    stats.cache_data_block_miss += 1;
531                }
532                Ok(BlockResponse::Entry(entry))
533            }
534            CachePolicy::NotFill => {
535                match self
536                    .block_cache
537                    .get(&idx)
538                    .await
539                    .map_err(HummockError::foyer_error)?
540                {
541                    Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
542                        entry,
543                    ))),
544                    _ => {
545                        let block = fetch_block().await.map_err(HummockError::foyer_error)?;
546                        Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
547                    }
548                }
549            }
550            CachePolicy::Disable => {
551                let block = fetch_block().await.map_err(HummockError::foyer_error)?;
552                Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
553            }
554        }
555    }
556
557    pub async fn get(
558        &self,
559        sst: &Sstable,
560        block_index: usize,
561        policy: CachePolicy,
562        stats: &mut StoreLocalStatistic,
563    ) -> HummockResult<BlockHolder> {
564        match self
565            .get_block_response(sst, block_index, policy, stats)
566            .await
567        {
568            Ok(block_response) => block_response.wait().await,
569            Err(err) => Err(err),
570        }
571    }
572
573    pub async fn get_vector_file_meta(
574        &self,
575        vector_file: &VectorFileInfo,
576    ) -> HummockResult<VectorFileHolder> {
577        let entry = self
578            .vector_meta_cache
579            .fetch(vector_file.object_id.as_raw(), || {
580                let store = self.store.clone();
581                let path =
582                    self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
583                let meta_offset = vector_file.meta_offset;
584                async move {
585                    let encoded_footer = store
586                        .read(&path, meta_offset..)
587                        .await
588                        .map_err(foyer::Error::other)?;
589                    let meta = VectorFileMeta::decode_footer(&encoded_footer)
590                        .map_err(foyer::Error::other)?;
591                    Ok::<_, foyer::Error>(meta.into())
592                }
593            })
594            .await?;
595        VectorFileHolder::try_from_entry(entry, vector_file.object_id.as_raw())
596    }
597
598    pub async fn get_vector_block(
599        &self,
600        vector_file: &VectorFileInfo,
601        block_idx: usize,
602        block_meta: &VectorBlockMeta,
603    ) -> HummockResult<VectorBlockHolder> {
604        self.vector_block_cache
605            .fetch((vector_file.object_id, block_idx), || {
606                let store = self.store.clone();
607                let path =
608                    self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
609                let start_offset = block_meta.offset;
610                let end_offset = start_offset + block_meta.block_size;
611                async move {
612                    let encoded_block = store
613                        .read(&path, start_offset..end_offset)
614                        .await
615                        .map_err(foyer::Error::other)?;
616                    let block = VectorBlock::decode(&encoded_block).map_err(foyer::Error::other)?;
617                    Ok(Box::new(block))
618                }
619            })
620            .await
621            .map_err(HummockError::foyer_error)
622    }
623
624    pub fn insert_vector_cache(
625        &self,
626        object_id: HummockVectorFileId,
627        meta: VectorFileMeta,
628        blocks: Vec<VectorBlock>,
629    ) {
630        self.vector_meta_cache
631            .insert(object_id.as_raw(), meta.into());
632        for (idx, block) in blocks.into_iter().enumerate() {
633            self.vector_block_cache
634                .insert((object_id, idx), Box::new(block));
635        }
636    }
637
638    pub fn insert_hnsw_graph_cache(&self, object_id: HummockHnswGraphFileId, graph: PbHnswGraph) {
639        self.vector_meta_cache
640            .insert(object_id.as_raw(), graph.into());
641    }
642
643    pub async fn get_hnsw_graph(
644        &self,
645        graph_file: &HnswGraphFileInfo,
646    ) -> HummockResult<HnswGraphFileHolder> {
647        let entry = self
648            .vector_meta_cache
649            .fetch(graph_file.object_id.as_raw(), || {
650                let store = self.store.clone();
651                let graph_file_path =
652                    self.get_object_data_path(HummockObjectId::HnswGraphFile(graph_file.object_id));
653                async move {
654                    let encoded_graph = store
655                        .read(&graph_file_path, ..)
656                        .await
657                        .map_err(foyer::Error::other)?;
658                    let graph =
659                        PbHnswGraph::decode(encoded_graph.as_ref()).map_err(foyer::Error::other)?;
660                    Ok::<_, foyer::Error>(graph.into())
661                }
662            })
663            .await?;
664        HnswGraphFileHolder::try_from_entry(entry, graph_file.object_id.as_raw())
665    }
666
667    pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
668        self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
669    }
670
671    pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
672        let obj_prefix = self.store.get_object_prefix(
673            object_id.as_raw().inner(),
674            self.use_new_object_prefix_strategy,
675        );
676        risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
677    }
678
679    pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
680        risingwave_hummock_sdk::get_object_id_from_path(path)
681    }
682
683    pub fn store(&self) -> ObjectStoreRef {
684        self.store.clone()
685    }
686
687    #[cfg(any(test, feature = "test"))]
688    pub async fn clear_block_cache(&self) -> HummockResult<()> {
689        self.block_cache
690            .clear()
691            .await
692            .map_err(HummockError::foyer_error)
693    }
694
695    #[cfg(any(test, feature = "test"))]
696    pub async fn clear_meta_cache(&self) -> HummockResult<()> {
697        self.meta_cache
698            .clear()
699            .await
700            .map_err(HummockError::foyer_error)
701    }
702
703    pub async fn sstable_cached(
704        &self,
705        sst_obj_id: HummockSstableObjectId,
706    ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
707        self.meta_cache
708            .get(&sst_obj_id)
709            .await
710            .map_err(HummockError::foyer_error)
711    }
712
713    /// Returns `table_holder`
714    pub fn sstable(
715        &self,
716        sstable_info_ref: &SstableInfo,
717        stats: &mut StoreLocalStatistic,
718    ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
719        let object_id = sstable_info_ref.object_id;
720
721        let entry = self.meta_cache.fetch(object_id, || {
722            let store = self.store.clone();
723            let meta_path = self.get_sst_data_path(object_id);
724            let stats_ptr = stats.remote_io_time.clone();
725            let range = sstable_info_ref.meta_offset as usize..;
726            async move {
727                let now = Instant::now();
728                let buf = store
729                    .read(&meta_path, range)
730                    .await
731                    .map_err(foyer::Error::other)?;
732                let meta = SstableMeta::decode(&buf[..]).map_err(foyer::Error::other)?;
733
734                let sst = Sstable::new(object_id, meta);
735                let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
736                stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
737                Ok(Box::new(sst))
738            }
739        });
740
741        if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
742            stats.cache_meta_block_miss += 1;
743        }
744
745        stats.cache_meta_block_total += 1;
746
747        async move { entry.await.map_err(HummockError::foyer_error) }
748    }
749
750    pub async fn list_sst_object_metadata_from_object_store(
751        &self,
752        prefix: Option<String>,
753        start_after: Option<String>,
754        limit: Option<usize>,
755    ) -> HummockResult<ObjectMetadataIter> {
756        let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
757        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
758        let iter = raw_iter.filter(|r| match r {
759            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
760            Err(_) => future::ready(true),
761        });
762        Ok(Box::pin(iter))
763    }
764
765    pub fn create_sst_writer(
766        self: Arc<Self>,
767        object_id: impl Into<HummockSstableObjectId>,
768        options: SstableWriterOptions,
769    ) -> BatchUploadWriter {
770        BatchUploadWriter::new(object_id, self, options)
771    }
772
773    pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
774        let sst = Sstable::new(object_id, meta);
775        self.meta_cache.insert(object_id, Box::new(sst));
776    }
777
778    pub fn insert_block_cache(
779        &self,
780        object_id: HummockSstableObjectId,
781        block_index: u64,
782        block: Box<Block>,
783    ) {
784        self.block_cache.insert(
785            SstableBlockIndex {
786                sst_id: object_id,
787                block_idx: block_index,
788            },
789            block,
790        );
791    }
792
793    pub fn get_prefetch_memory_usage(&self) -> usize {
794        self.prefetch_buffer_usage.load(Ordering::Acquire)
795    }
796
797    pub async fn get_stream_for_blocks(
798        &self,
799        object_id: HummockSstableObjectId,
800        metas: &[BlockMeta],
801    ) -> HummockResult<BlockDataStream> {
802        fail_point!("get_stream_err");
803        let data_path = self.get_sst_data_path(object_id);
804        let store = self.store().clone();
805        let block_meta = &metas[0];
806        let start_pos = block_meta.offset as usize;
807        let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
808        let range = start_pos..end_pos;
809        // spawn to tokio pool because the object-storage sdk may not be safe to cancel.
810        let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
811
812        let reader = match ret {
813            Ok(Ok(reader)) => reader,
814            Ok(Err(e)) => return Err(HummockError::from(e)),
815            Err(e) => {
816                return Err(HummockError::other(format!(
817                    "failed to get result, this read request may be canceled: {}",
818                    e.as_report()
819                )));
820            }
821        };
822        Ok(BlockDataStream::new(reader, metas.to_vec()))
823    }
824
825    pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
826        &self.meta_cache
827    }
828
829    pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
830        &self.block_cache
831    }
832
833    pub fn recent_filter(&self) -> &Arc<RecentFilter<(HummockSstableObjectId, usize)>> {
834        &self.recent_filter
835    }
836
837    pub async fn create_streaming_uploader(
838        &self,
839        path: &str,
840    ) -> ObjectResult<ObjectStreamingUploader> {
841        self.store.streaming_upload(path).await
842    }
843}
844
845pub type SstableStoreRef = Arc<SstableStore>;
846#[cfg(test)]
847mod tests {
848    use std::ops::Range;
849    use std::sync::Arc;
850
851    use risingwave_hummock_sdk::HummockObjectId;
852    use risingwave_hummock_sdk::sstable_info::SstableInfo;
853
854    use super::{SstableStoreRef, SstableWriterOptions};
855    use crate::hummock::iterator::HummockIterator;
856    use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
857    use crate::hummock::sstable::SstableIteratorReadOptions;
858    use crate::hummock::test_utils::{
859        default_builder_opt_for_test, gen_test_sstable_data, put_sst,
860    };
861    use crate::hummock::value::HummockValue;
862    use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
863    use crate::monitor::StoreLocalStatistic;
864
865    const SST_ID: u64 = 1;
866
867    fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
868        HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
869    }
870
871    async fn validate_sst(
872        sstable_store: SstableStoreRef,
873        info: &SstableInfo,
874        mut meta: SstableMeta,
875        x_range: Range<usize>,
876    ) {
877        let mut stats = StoreLocalStatistic::default();
878        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
879        std::mem::take(&mut meta.bloom_filter);
880        assert_eq!(holder.meta, meta);
881        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
882        assert_eq!(holder.meta, meta);
883        let mut iter = SstableIterator::new(
884            holder,
885            sstable_store,
886            Arc::new(SstableIteratorReadOptions::default()),
887            info,
888        );
889        iter.rewind().await.unwrap();
890        for i in x_range {
891            let key = iter.key();
892            let value = iter.value();
893            assert_eq!(key, iterator_test_key_of(i).to_ref());
894            assert_eq!(value, get_hummock_value(i).as_slice());
895            iter.next().await.unwrap();
896        }
897    }
898
899    #[tokio::test]
900    async fn test_batch_upload() {
901        let sstable_store = mock_sstable_store().await;
902        let x_range = 0..100;
903        let (data, meta) = gen_test_sstable_data(
904            default_builder_opt_for_test(),
905            x_range
906                .clone()
907                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
908        )
909        .await;
910        let writer_opts = SstableWriterOptions {
911            capacity_hint: None,
912            tracker: None,
913            policy: CachePolicy::Disable,
914        };
915        let info = put_sst(
916            SST_ID,
917            data.clone(),
918            meta.clone(),
919            sstable_store.clone(),
920            writer_opts,
921            vec![0],
922        )
923        .await
924        .unwrap();
925
926        validate_sst(sstable_store, &info, meta, x_range).await;
927    }
928
929    #[tokio::test]
930    async fn test_streaming_upload() {
931        // Generate test data.
932        let sstable_store = mock_sstable_store().await;
933        let x_range = 0..100;
934        let (data, meta) = gen_test_sstable_data(
935            default_builder_opt_for_test(),
936            x_range
937                .clone()
938                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
939        )
940        .await;
941        let writer_opts = SstableWriterOptions {
942            capacity_hint: None,
943            tracker: None,
944            policy: CachePolicy::Disable,
945        };
946        let info = put_sst(
947            SST_ID,
948            data.clone(),
949            meta.clone(),
950            sstable_store.clone(),
951            writer_opts,
952            vec![0],
953        )
954        .await
955        .unwrap();
956
957        validate_sst(sstable_store, &info, meta, x_range).await;
958    }
959
960    #[tokio::test]
961    async fn test_basic() {
962        let sstable_store = mock_sstable_store().await;
963        let object_id = 123;
964        let data_path = sstable_store.get_sst_data_path(object_id);
965        assert_eq!(data_path, "test/123.data");
966        assert_eq!(
967            SstableStore::get_object_id_from_path(&data_path),
968            HummockObjectId::Sstable(object_id.into())
969        );
970    }
971}