risingwave_storage/hummock/
sstable_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::clone::Clone;
15use std::collections::VecDeque;
16use std::future::Future;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use fail::fail_point;
23use foyer::{
24    Cache, CacheBuilder, CacheEntry, Engine, EventListener, FetchState, Hint, HybridCache,
25    HybridCacheBuilder, HybridCacheEntry, HybridCacheProperties, LargeEngineOptions,
26};
27use futures::{StreamExt, future};
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::vector_index::VectorFileInfo;
30use risingwave_hummock_sdk::{
31    HummockObjectId, HummockSstableObjectId, HummockVectorFileId, SST_OBJECT_SUFFIX,
32};
33use risingwave_hummock_trace::TracedCachePolicy;
34use risingwave_object_store::object::{
35    ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
36};
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39use tokio::time::Instant;
40
41use super::{
42    BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
43    SstableWriterOptions,
44};
45use crate::hummock::block_stream::{
46    BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
47};
48use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
49use crate::hummock::{BlockHolder, HummockError, HummockResult};
50use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
51
52pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
53pub type VectorFileHolder = CacheEntry<HummockVectorFileId, Box<VectorFileMeta>>;
54pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
55
56#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
57pub struct SstableBlockIndex {
58    pub sst_id: HummockSstableObjectId,
59    pub block_idx: u64,
60}
61
62pub struct BlockCacheEventListener {
63    metrics: Arc<HummockStateStoreMetrics>,
64}
65
66impl BlockCacheEventListener {
67    pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
68        Self { metrics }
69    }
70}
71
72impl EventListener for BlockCacheEventListener {
73    type Key = SstableBlockIndex;
74    type Value = Box<Block>;
75
76    fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
77    where
78        Self::Key: foyer::Key,
79        Self::Value: foyer::Value,
80    {
81        self.metrics
82            .block_efficiency_histogram
83            .observe(value.efficiency());
84    }
85}
86
87// TODO: Define policy based on use cases (read / compaction / ...).
88#[derive(Clone, Copy, Eq, PartialEq)]
89pub enum CachePolicy {
90    /// Disable read cache and not fill the cache afterwards.
91    Disable,
92    /// Try reading the cache and fill the cache afterwards.
93    Fill(Hint),
94    /// Read the cache but not fill the cache afterwards.
95    NotFill,
96}
97
98impl Default for CachePolicy {
99    fn default() -> Self {
100        CachePolicy::Fill(Hint::Normal)
101    }
102}
103
104impl From<TracedCachePolicy> for CachePolicy {
105    fn from(policy: TracedCachePolicy) -> Self {
106        match policy {
107            TracedCachePolicy::Disable => Self::Disable,
108            TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
109            TracedCachePolicy::NotFill => Self::NotFill,
110        }
111    }
112}
113
114impl From<CachePolicy> for TracedCachePolicy {
115    fn from(policy: CachePolicy) -> Self {
116        match policy {
117            CachePolicy::Disable => Self::Disable,
118            CachePolicy::Fill(priority) => Self::Fill(priority.into()),
119            CachePolicy::NotFill => Self::NotFill,
120        }
121    }
122}
123
124pub struct SstableStoreConfig {
125    pub store: ObjectStoreRef,
126    pub path: String,
127
128    pub prefetch_buffer_capacity: usize,
129    pub max_prefetch_block_number: usize,
130    pub recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
131    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
132    pub use_new_object_prefix_strategy: bool,
133
134    pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
135    pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
136
137    pub vector_meta_cache: Cache<HummockVectorFileId, Box<VectorFileMeta>>,
138    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
139}
140
141pub struct SstableStore {
142    path: String,
143    store: ObjectStoreRef,
144
145    meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
146    block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
147    pub vector_meta_cache: Cache<HummockVectorFileId, Box<VectorFileMeta>>,
148    pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
149
150    /// Recent filter for `(sst_obj_id, blk_idx)`.
151    ///
152    /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
153    recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
154    prefetch_buffer_usage: Arc<AtomicUsize>,
155    prefetch_buffer_capacity: usize,
156    max_prefetch_block_number: usize,
157    /// Whether the object store is divided into prefixes depends on two factors:
158    ///   1. The specific object store type.
159    ///   2. Whether the existing cluster is a new cluster.
160    ///
161    /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters.
162    /// For a new cluster, `use_new_object_prefix_strategy` is set to True.
163    /// For an old cluster, `use_new_object_prefix_strategy` is set to False.
164    /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility.
165    use_new_object_prefix_strategy: bool,
166}
167
168impl SstableStore {
169    pub fn new(config: SstableStoreConfig) -> Self {
170        // TODO: We should validate path early. Otherwise object store won't report invalid path
171        // error until first write attempt.
172
173        Self {
174            path: config.path,
175            store: config.store,
176
177            meta_cache: config.meta_cache,
178            block_cache: config.block_cache,
179            vector_meta_cache: config.vector_meta_cache,
180            vector_block_cache: config.vector_block_cache,
181
182            recent_filter: config.recent_filter,
183            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
184            prefetch_buffer_capacity: config.prefetch_buffer_capacity,
185            max_prefetch_block_number: config.max_prefetch_block_number,
186            use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
187        }
188    }
189
190    /// For compactor, we do not need a high concurrency load for cache. Instead, we need the cache
191    ///  can be evict more effective.
192    #[expect(clippy::borrowed_box)]
193    pub async fn for_compactor(
194        store: ObjectStoreRef,
195        path: String,
196        block_cache_capacity: usize,
197        meta_cache_capacity: usize,
198        use_new_object_prefix_strategy: bool,
199    ) -> HummockResult<Self> {
200        let meta_cache = HybridCacheBuilder::new()
201            .memory(meta_cache_capacity)
202            .with_shards(1)
203            .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
204                u64::BITS as usize / 8 + value.estimate_size()
205            })
206            .storage(Engine::Large(LargeEngineOptions::new()))
207            .build()
208            .await
209            .map_err(HummockError::foyer_error)?;
210
211        let block_cache = HybridCacheBuilder::new()
212            .memory(block_cache_capacity)
213            .with_shards(1)
214            .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
215                // FIXME(MrCroxx): Calculate block weight more accurately.
216                u64::BITS as usize * 2 / 8 + value.raw().len()
217            })
218            .storage(Engine::Large(LargeEngineOptions::new()))
219            .build()
220            .await
221            .map_err(HummockError::foyer_error)?;
222
223        Ok(Self {
224            path,
225            store,
226
227            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
228            prefetch_buffer_capacity: block_cache_capacity,
229            max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
230            recent_filter: None,
231            use_new_object_prefix_strategy,
232
233            meta_cache,
234            block_cache,
235            vector_meta_cache: CacheBuilder::new(1 << 10).build(),
236            vector_block_cache: CacheBuilder::new(1 << 10).build(),
237        })
238    }
239
240    pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
241        self.store
242            .delete(self.get_sst_data_path(object_id).as_str())
243            .await?;
244        self.meta_cache.remove(&object_id);
245        // TODO(MrCroxx): support group remove in foyer.
246        Ok(())
247    }
248
249    pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
250        self.meta_cache.remove(&object_id);
251        Ok(())
252    }
253
254    pub(crate) async fn put_sst_data(
255        &self,
256        object_id: HummockSstableObjectId,
257        data: Bytes,
258    ) -> HummockResult<()> {
259        let data_path = self.get_sst_data_path(object_id);
260        self.store
261            .upload(&data_path, data)
262            .await
263            .map_err(Into::into)
264    }
265
266    pub async fn prefetch_blocks(
267        &self,
268        sst: &Sstable,
269        block_index: usize,
270        end_index: usize,
271        policy: CachePolicy,
272        stats: &mut StoreLocalStatistic,
273    ) -> HummockResult<Box<dyn BlockStream>> {
274        let object_id = sst.id;
275        if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
276            let block = self.get(sst, block_index, policy, stats).await?;
277            return Ok(Box::new(PrefetchBlockStream::new(
278                VecDeque::from([block]),
279                block_index,
280                None,
281            )));
282        }
283        stats.cache_data_block_total += 1;
284        if let Some(entry) = self
285            .block_cache
286            .get(&SstableBlockIndex {
287                sst_id: object_id,
288                block_idx: block_index as _,
289            })
290            .await
291            .map_err(HummockError::foyer_error)?
292        {
293            let block = BlockHolder::from_hybrid_cache_entry(entry);
294            return Ok(Box::new(PrefetchBlockStream::new(
295                VecDeque::from([block]),
296                block_index,
297                None,
298            )));
299        }
300        let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
301        let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
302        let start_offset = sst.meta.block_metas[block_index].offset as usize;
303        let mut min_hit_index = end_index;
304        let mut hit_count = 0;
305        for idx in block_index..end_index {
306            if self.block_cache.contains(&SstableBlockIndex {
307                sst_id: object_id,
308                block_idx: idx as _,
309            }) {
310                if min_hit_index > idx && idx > block_index {
311                    min_hit_index = idx;
312                }
313                hit_count += 1;
314            }
315        }
316
317        if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
318        {
319            end_index = min_hit_index;
320        }
321        stats.cache_data_prefetch_count += 1;
322        stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
323        let end_offset = start_offset
324            + sst.meta.block_metas[block_index..end_index]
325                .iter()
326                .map(|meta| meta.len as usize)
327                .sum::<usize>();
328        let data_path = self.get_sst_data_path(object_id);
329        let memory_usage = end_offset - start_offset;
330        let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
331        let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
332        let store = self.store.clone();
333        let join_handle = tokio::spawn(async move {
334            store
335                .read(&data_path, start_offset..end_offset)
336                .instrument_await(span)
337                .await
338        });
339        let buf = match join_handle.await {
340            Ok(Ok(data)) => data,
341            Ok(Err(e)) => {
342                tracing::error!(
343                    "prefetch meet error when read {}..{} from sst-{} ({})",
344                    start_offset,
345                    end_offset,
346                    object_id,
347                    sst.meta.estimated_size,
348                );
349                return Err(e.into());
350            }
351            Err(_) => {
352                return Err(HummockError::other("cancel by other thread"));
353            }
354        };
355        let mut offset = 0;
356        let mut blocks = VecDeque::default();
357        for idx in block_index..end_index {
358            let end = offset + sst.meta.block_metas[idx].len as usize;
359            if end > buf.len() {
360                return Err(ObjectError::internal("read unexpected EOF").into());
361            }
362            // copy again to avoid holding a large data in memory.
363            let block = Block::decode_with_copy(
364                buf.slice(offset..end),
365                sst.meta.block_metas[idx].uncompressed_size as usize,
366                true,
367            )?;
368            let holder = if let CachePolicy::Fill(hint) = policy {
369                let hint = if idx == block_index { hint } else { Hint::Low };
370                let entry = self.block_cache.insert_with_properties(
371                    SstableBlockIndex {
372                        sst_id: object_id,
373                        block_idx: idx as _,
374                    },
375                    Box::new(block),
376                    HybridCacheProperties::default().with_hint(hint),
377                );
378                BlockHolder::from_hybrid_cache_entry(entry)
379            } else {
380                BlockHolder::from_owned_block(Box::new(block))
381            };
382
383            blocks.push_back(holder);
384            offset = end;
385        }
386        Ok(Box::new(PrefetchBlockStream::new(
387            blocks,
388            block_index,
389            Some(tracker),
390        )))
391    }
392
393    pub async fn get_block_response(
394        &self,
395        sst: &Sstable,
396        block_index: usize,
397        policy: CachePolicy,
398        stats: &mut StoreLocalStatistic,
399    ) -> HummockResult<BlockResponse> {
400        let object_id = sst.id;
401        let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
402        let store = self.store.clone();
403
404        stats.cache_data_block_total += 1;
405        let file_size = sst.meta.estimated_size;
406        let data_path = self.get_sst_data_path(object_id);
407
408        let disable_cache: fn() -> bool = || {
409            fail_point!("disable_block_cache", |_| true);
410            false
411        };
412
413        let policy = if disable_cache() {
414            CachePolicy::Disable
415        } else {
416            policy
417        };
418
419        // future: fetch block if hybrid cache miss
420        let fetch_block = move || {
421            let range = range.clone();
422
423            async move {
424                let block_data = match store
425                    .read(&data_path, range.clone())
426                    .instrument_await("get_block_response".verbose())
427                    .await
428                {
429                    Ok(data) => data,
430                    Err(e) => {
431                        tracing::error!(
432                            "get_block_response meet error when read {:?} from sst-{}, total length: {}",
433                            range,
434                            object_id,
435                            file_size
436                        );
437                        return Err(anyhow::Error::from(HummockError::from(e)));
438                    }
439                };
440                let block = Box::new(
441                    Block::decode(block_data, uncompressed_capacity)
442                        .map_err(anyhow::Error::from)?,
443                );
444                Ok(block)
445            }
446        };
447
448        if let Some(filter) = self.recent_filter.as_ref() {
449            filter.extend([(object_id, usize::MAX), (object_id, block_index)]);
450        }
451
452        match policy {
453            CachePolicy::Fill(hint) => {
454                let entry = self.block_cache.fetch_with_properties(
455                    SstableBlockIndex {
456                        sst_id: object_id,
457                        block_idx: block_index as _,
458                    },
459                    HybridCacheProperties::default().with_hint(hint),
460                    fetch_block,
461                );
462                if matches!(entry.state(), FetchState::Miss) {
463                    stats.cache_data_block_miss += 1;
464                }
465                Ok(BlockResponse::Entry(entry))
466            }
467            CachePolicy::NotFill => {
468                match self
469                    .block_cache
470                    .get(&SstableBlockIndex {
471                        sst_id: object_id,
472                        block_idx: block_index as _,
473                    })
474                    .await
475                    .map_err(HummockError::foyer_error)?
476                {
477                    Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
478                        entry,
479                    ))),
480                    _ => {
481                        let block = fetch_block().await.map_err(HummockError::foyer_error)?;
482                        Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
483                    }
484                }
485            }
486            CachePolicy::Disable => {
487                let block = fetch_block().await.map_err(HummockError::foyer_error)?;
488                Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
489            }
490        }
491    }
492
493    pub async fn get(
494        &self,
495        sst: &Sstable,
496        block_index: usize,
497        policy: CachePolicy,
498        stats: &mut StoreLocalStatistic,
499    ) -> HummockResult<BlockHolder> {
500        match self
501            .get_block_response(sst, block_index, policy, stats)
502            .await
503        {
504            Ok(block_response) => block_response.wait().await,
505            Err(err) => Err(err),
506        }
507    }
508
509    pub async fn get_vector_file_meta(
510        &self,
511        vector_file: &VectorFileInfo,
512    ) -> HummockResult<VectorFileHolder> {
513        self.vector_meta_cache
514            .fetch(vector_file.object_id, || {
515                let store = self.store.clone();
516                let path =
517                    self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
518                let meta_offset = vector_file.meta_offset;
519                async move {
520                    let encoded_footer = store.read(&path, meta_offset..).await?;
521                    let meta = VectorFileMeta::decode_footer(&encoded_footer)?;
522                    Ok(Box::new(meta))
523                }
524            })
525            .await
526    }
527
528    pub async fn get_vector_block(
529        &self,
530        vector_file: &VectorFileInfo,
531        block_idx: usize,
532        block_meta: &VectorBlockMeta,
533    ) -> HummockResult<VectorBlockHolder> {
534        self.vector_block_cache
535            .fetch((vector_file.object_id, block_idx), || {
536                let store = self.store.clone();
537                let path =
538                    self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
539                let start_offset = block_meta.offset;
540                let end_offset = start_offset + block_meta.block_size;
541                async move {
542                    let encoded_block = store.read(&path, start_offset..end_offset).await?;
543                    let block = VectorBlock::decode(&encoded_block)?;
544                    Ok(Box::new(block))
545                }
546            })
547            .await
548    }
549
550    pub fn insert_vector_cache(
551        &self,
552        object_id: HummockVectorFileId,
553        meta: VectorFileMeta,
554        blocks: Vec<VectorBlock>,
555    ) {
556        self.vector_meta_cache.insert(object_id, Box::new(meta));
557        for (idx, block) in blocks.into_iter().enumerate() {
558            self.vector_block_cache
559                .insert((object_id, idx), Box::new(block));
560        }
561    }
562
563    pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
564        self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
565    }
566
567    pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
568        let obj_prefix = self.store.get_object_prefix(
569            object_id.as_raw().inner(),
570            self.use_new_object_prefix_strategy,
571        );
572        risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
573    }
574
575    pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
576        risingwave_hummock_sdk::get_object_id_from_path(path)
577    }
578
579    pub fn store(&self) -> ObjectStoreRef {
580        self.store.clone()
581    }
582
583    #[cfg(any(test, feature = "test"))]
584    pub async fn clear_block_cache(&self) -> HummockResult<()> {
585        self.block_cache
586            .clear()
587            .await
588            .map_err(HummockError::foyer_error)
589    }
590
591    #[cfg(any(test, feature = "test"))]
592    pub async fn clear_meta_cache(&self) -> HummockResult<()> {
593        self.meta_cache
594            .clear()
595            .await
596            .map_err(HummockError::foyer_error)
597    }
598
599    pub async fn sstable_cached(
600        &self,
601        sst_obj_id: HummockSstableObjectId,
602    ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
603        self.meta_cache
604            .get(&sst_obj_id)
605            .await
606            .map_err(HummockError::foyer_error)
607    }
608
609    /// Returns `table_holder`
610    pub fn sstable(
611        &self,
612        sstable_info_ref: &SstableInfo,
613        stats: &mut StoreLocalStatistic,
614    ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
615        let object_id = sstable_info_ref.object_id;
616
617        let entry = self.meta_cache.fetch(object_id, || {
618            let store = self.store.clone();
619            let meta_path = self.get_sst_data_path(object_id);
620            let stats_ptr = stats.remote_io_time.clone();
621            let range = sstable_info_ref.meta_offset as usize..;
622            async move {
623                let now = Instant::now();
624                let buf = store.read(&meta_path, range).await?;
625                let meta = SstableMeta::decode(&buf[..])?;
626
627                let sst = Sstable::new(object_id, meta);
628                let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
629                stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
630                Ok(Box::new(sst))
631            }
632        });
633
634        if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
635            stats.cache_meta_block_miss += 1;
636        }
637
638        stats.cache_meta_block_total += 1;
639
640        async move { entry.await.map_err(HummockError::foyer_error) }
641    }
642
643    pub async fn list_sst_object_metadata_from_object_store(
644        &self,
645        prefix: Option<String>,
646        start_after: Option<String>,
647        limit: Option<usize>,
648    ) -> HummockResult<ObjectMetadataIter> {
649        let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
650        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
651        let iter = raw_iter.filter(|r| match r {
652            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
653            Err(_) => future::ready(true),
654        });
655        Ok(Box::pin(iter))
656    }
657
658    pub fn create_sst_writer(
659        self: Arc<Self>,
660        object_id: impl Into<HummockSstableObjectId>,
661        options: SstableWriterOptions,
662    ) -> BatchUploadWriter {
663        BatchUploadWriter::new(object_id, self, options)
664    }
665
666    pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
667        let sst = Sstable::new(object_id, meta);
668        self.meta_cache.insert(object_id, Box::new(sst));
669    }
670
671    pub fn insert_block_cache(
672        &self,
673        object_id: HummockSstableObjectId,
674        block_index: u64,
675        block: Box<Block>,
676    ) {
677        self.block_cache.insert(
678            SstableBlockIndex {
679                sst_id: object_id,
680                block_idx: block_index,
681            },
682            block,
683        );
684    }
685
686    pub fn get_prefetch_memory_usage(&self) -> usize {
687        self.prefetch_buffer_usage.load(Ordering::Acquire)
688    }
689
690    pub async fn get_stream_for_blocks(
691        &self,
692        object_id: HummockSstableObjectId,
693        metas: &[BlockMeta],
694    ) -> HummockResult<BlockDataStream> {
695        fail_point!("get_stream_err");
696        let data_path = self.get_sst_data_path(object_id);
697        let store = self.store().clone();
698        let block_meta = &metas[0];
699        let start_pos = block_meta.offset as usize;
700        let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
701        let range = start_pos..end_pos;
702        // spawn to tokio pool because the object-storage sdk may not be safe to cancel.
703        let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
704
705        let reader = match ret {
706            Ok(Ok(reader)) => reader,
707            Ok(Err(e)) => return Err(HummockError::from(e)),
708            Err(e) => {
709                return Err(HummockError::other(format!(
710                    "failed to get result, this read request may be canceled: {}",
711                    e.as_report()
712                )));
713            }
714        };
715        Ok(BlockDataStream::new(reader, metas.to_vec()))
716    }
717
718    pub fn data_recent_filter(
719        &self,
720    ) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
721        self.recent_filter.as_ref()
722    }
723
724    pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
725        &self.meta_cache
726    }
727
728    pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
729        &self.block_cache
730    }
731
732    pub fn recent_filter(&self) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
733        self.recent_filter.as_ref()
734    }
735
736    pub async fn create_streaming_uploader(
737        &self,
738        path: &str,
739    ) -> ObjectResult<ObjectStreamingUploader> {
740        self.store.streaming_upload(path).await
741    }
742}
743
744pub type SstableStoreRef = Arc<SstableStore>;
745#[cfg(test)]
746mod tests {
747    use std::ops::Range;
748    use std::sync::Arc;
749
750    use risingwave_hummock_sdk::HummockObjectId;
751    use risingwave_hummock_sdk::sstable_info::SstableInfo;
752
753    use super::{SstableStoreRef, SstableWriterOptions};
754    use crate::hummock::iterator::HummockIterator;
755    use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
756    use crate::hummock::sstable::SstableIteratorReadOptions;
757    use crate::hummock::test_utils::{
758        default_builder_opt_for_test, gen_test_sstable_data, put_sst,
759    };
760    use crate::hummock::value::HummockValue;
761    use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
762    use crate::monitor::StoreLocalStatistic;
763
764    const SST_ID: u64 = 1;
765
766    fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
767        HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
768    }
769
770    async fn validate_sst(
771        sstable_store: SstableStoreRef,
772        info: &SstableInfo,
773        mut meta: SstableMeta,
774        x_range: Range<usize>,
775    ) {
776        let mut stats = StoreLocalStatistic::default();
777        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
778        std::mem::take(&mut meta.bloom_filter);
779        assert_eq!(holder.meta, meta);
780        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
781        assert_eq!(holder.meta, meta);
782        let mut iter = SstableIterator::new(
783            holder,
784            sstable_store,
785            Arc::new(SstableIteratorReadOptions::default()),
786            info,
787        );
788        iter.rewind().await.unwrap();
789        for i in x_range {
790            let key = iter.key();
791            let value = iter.value();
792            assert_eq!(key, iterator_test_key_of(i).to_ref());
793            assert_eq!(value, get_hummock_value(i).as_slice());
794            iter.next().await.unwrap();
795        }
796    }
797
798    #[tokio::test]
799    async fn test_batch_upload() {
800        let sstable_store = mock_sstable_store().await;
801        let x_range = 0..100;
802        let (data, meta) = gen_test_sstable_data(
803            default_builder_opt_for_test(),
804            x_range
805                .clone()
806                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
807        )
808        .await;
809        let writer_opts = SstableWriterOptions {
810            capacity_hint: None,
811            tracker: None,
812            policy: CachePolicy::Disable,
813        };
814        let info = put_sst(
815            SST_ID,
816            data.clone(),
817            meta.clone(),
818            sstable_store.clone(),
819            writer_opts,
820            vec![0],
821        )
822        .await
823        .unwrap();
824
825        validate_sst(sstable_store, &info, meta, x_range).await;
826    }
827
828    #[tokio::test]
829    async fn test_streaming_upload() {
830        // Generate test data.
831        let sstable_store = mock_sstable_store().await;
832        let x_range = 0..100;
833        let (data, meta) = gen_test_sstable_data(
834            default_builder_opt_for_test(),
835            x_range
836                .clone()
837                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
838        )
839        .await;
840        let writer_opts = SstableWriterOptions {
841            capacity_hint: None,
842            tracker: None,
843            policy: CachePolicy::Disable,
844        };
845        let info = put_sst(
846            SST_ID,
847            data.clone(),
848            meta.clone(),
849            sstable_store.clone(),
850            writer_opts,
851            vec![0],
852        )
853        .await
854        .unwrap();
855
856        validate_sst(sstable_store, &info, meta, x_range).await;
857    }
858
859    #[tokio::test]
860    async fn test_basic() {
861        let sstable_store = mock_sstable_store().await;
862        let object_id = 123;
863        let data_path = sstable_store.get_sst_data_path(object_id);
864        assert_eq!(data_path, "test/123.data");
865        assert_eq!(
866            SstableStore::get_object_id_from_path(&data_path),
867            HummockObjectId::Sstable(object_id.into())
868        );
869    }
870}