risingwave_storage/hummock/
sstable_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::clone::Clone;
15use std::collections::VecDeque;
16use std::future::Future;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use fail::fail_point;
23use foyer::{
24    Engine, EventListener, FetchState, Hint, HybridCache, HybridCacheBuilder, HybridCacheEntry,
25    HybridCacheProperties,
26};
27use futures::{StreamExt, future};
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId, SST_OBJECT_SUFFIX};
30use risingwave_hummock_trace::TracedCachePolicy;
31use risingwave_object_store::object::{
32    ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
33};
34use serde::{Deserialize, Serialize};
35use thiserror_ext::AsReport;
36use tokio::time::Instant;
37
38use super::{
39    BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
40    SstableWriterOptions,
41};
42use crate::hummock::block_stream::{
43    BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
44};
45use crate::hummock::{BlockHolder, HummockError, HummockResult};
46use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
47
48pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
49
50#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
51pub struct SstableBlockIndex {
52    pub sst_id: HummockSstableObjectId,
53    pub block_idx: u64,
54}
55
56pub struct BlockCacheEventListener {
57    metrics: Arc<HummockStateStoreMetrics>,
58}
59
60impl BlockCacheEventListener {
61    pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
62        Self { metrics }
63    }
64}
65
66impl EventListener for BlockCacheEventListener {
67    type Key = SstableBlockIndex;
68    type Value = Box<Block>;
69
70    fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
71    where
72        Self::Key: foyer::Key,
73        Self::Value: foyer::Value,
74    {
75        self.metrics
76            .block_efficiency_histogram
77            .observe(value.efficiency());
78    }
79}
80
81// TODO: Define policy based on use cases (read / compaction / ...).
82#[derive(Clone, Copy, Eq, PartialEq)]
83pub enum CachePolicy {
84    /// Disable read cache and not fill the cache afterwards.
85    Disable,
86    /// Try reading the cache and fill the cache afterwards.
87    Fill(Hint),
88    /// Read the cache but not fill the cache afterwards.
89    NotFill,
90}
91
92impl Default for CachePolicy {
93    fn default() -> Self {
94        CachePolicy::Fill(Hint::Normal)
95    }
96}
97
98impl From<TracedCachePolicy> for CachePolicy {
99    fn from(policy: TracedCachePolicy) -> Self {
100        match policy {
101            TracedCachePolicy::Disable => Self::Disable,
102            TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
103            TracedCachePolicy::NotFill => Self::NotFill,
104        }
105    }
106}
107
108impl From<CachePolicy> for TracedCachePolicy {
109    fn from(policy: CachePolicy) -> Self {
110        match policy {
111            CachePolicy::Disable => Self::Disable,
112            CachePolicy::Fill(priority) => Self::Fill(priority.into()),
113            CachePolicy::NotFill => Self::NotFill,
114        }
115    }
116}
117
118pub struct SstableStoreConfig {
119    pub store: ObjectStoreRef,
120    pub path: String,
121
122    pub prefetch_buffer_capacity: usize,
123    pub max_prefetch_block_number: usize,
124    pub recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
125    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
126    pub use_new_object_prefix_strategy: bool,
127
128    pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
129    pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
130}
131
132pub struct SstableStore {
133    path: String,
134    store: ObjectStoreRef,
135
136    meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
137    block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
138
139    /// Recent filter for `(sst_obj_id, blk_idx)`.
140    ///
141    /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
142    recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
143    prefetch_buffer_usage: Arc<AtomicUsize>,
144    prefetch_buffer_capacity: usize,
145    max_prefetch_block_number: usize,
146    /// Whether the object store is divided into prefixes depends on two factors:
147    ///   1. The specific object store type.
148    ///   2. Whether the existing cluster is a new cluster.
149    ///
150    /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters.
151    /// For a new cluster, `use_new_object_prefix_strategy` is set to True.
152    /// For an old cluster, `use_new_object_prefix_strategy` is set to False.
153    /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility.
154    use_new_object_prefix_strategy: bool,
155}
156
157impl SstableStore {
158    pub fn new(config: SstableStoreConfig) -> Self {
159        // TODO: We should validate path early. Otherwise object store won't report invalid path
160        // error until first write attempt.
161
162        Self {
163            path: config.path,
164            store: config.store,
165
166            meta_cache: config.meta_cache,
167            block_cache: config.block_cache,
168
169            recent_filter: config.recent_filter,
170            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
171            prefetch_buffer_capacity: config.prefetch_buffer_capacity,
172            max_prefetch_block_number: config.max_prefetch_block_number,
173            use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
174        }
175    }
176
177    /// For compactor, we do not need a high concurrency load for cache. Instead, we need the cache
178    ///  can be evict more effective.
179    #[expect(clippy::borrowed_box)]
180    pub async fn for_compactor(
181        store: ObjectStoreRef,
182        path: String,
183        block_cache_capacity: usize,
184        meta_cache_capacity: usize,
185        use_new_object_prefix_strategy: bool,
186    ) -> HummockResult<Self> {
187        let meta_cache = HybridCacheBuilder::new()
188            .memory(meta_cache_capacity)
189            .with_shards(1)
190            .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
191                u64::BITS as usize / 8 + value.estimate_size()
192            })
193            .storage(Engine::Large)
194            .build()
195            .await
196            .map_err(HummockError::foyer_error)?;
197
198        let block_cache = HybridCacheBuilder::new()
199            .memory(block_cache_capacity)
200            .with_shards(1)
201            .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
202                // FIXME(MrCroxx): Calculate block weight more accurately.
203                u64::BITS as usize * 2 / 8 + value.raw().len()
204            })
205            .storage(Engine::Large)
206            .build()
207            .await
208            .map_err(HummockError::foyer_error)?;
209
210        Ok(Self {
211            path,
212            store,
213
214            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
215            prefetch_buffer_capacity: block_cache_capacity,
216            max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
217            recent_filter: None,
218            use_new_object_prefix_strategy,
219
220            meta_cache,
221            block_cache,
222        })
223    }
224
225    pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
226        self.store
227            .delete(self.get_sst_data_path(object_id).as_str())
228            .await?;
229        self.meta_cache.remove(&object_id);
230        // TODO(MrCroxx): support group remove in foyer.
231        Ok(())
232    }
233
234    pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
235        self.meta_cache.remove(&object_id);
236        Ok(())
237    }
238
239    pub(crate) async fn put_sst_data(
240        &self,
241        object_id: HummockSstableObjectId,
242        data: Bytes,
243    ) -> HummockResult<()> {
244        let data_path = self.get_sst_data_path(object_id);
245        self.store
246            .upload(&data_path, data)
247            .await
248            .map_err(Into::into)
249    }
250
251    pub async fn prefetch_blocks(
252        &self,
253        sst: &Sstable,
254        block_index: usize,
255        end_index: usize,
256        policy: CachePolicy,
257        stats: &mut StoreLocalStatistic,
258    ) -> HummockResult<Box<dyn BlockStream>> {
259        let object_id = sst.id;
260        if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
261            let block = self.get(sst, block_index, policy, stats).await?;
262            return Ok(Box::new(PrefetchBlockStream::new(
263                VecDeque::from([block]),
264                block_index,
265                None,
266            )));
267        }
268        stats.cache_data_block_total += 1;
269        if let Some(entry) = self
270            .block_cache
271            .get(&SstableBlockIndex {
272                sst_id: object_id,
273                block_idx: block_index as _,
274            })
275            .await
276            .map_err(HummockError::foyer_error)?
277        {
278            let block = BlockHolder::from_hybrid_cache_entry(entry);
279            return Ok(Box::new(PrefetchBlockStream::new(
280                VecDeque::from([block]),
281                block_index,
282                None,
283            )));
284        }
285        let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
286        let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
287        let start_offset = sst.meta.block_metas[block_index].offset as usize;
288        let mut min_hit_index = end_index;
289        let mut hit_count = 0;
290        for idx in block_index..end_index {
291            if self.block_cache.contains(&SstableBlockIndex {
292                sst_id: object_id,
293                block_idx: idx as _,
294            }) {
295                if min_hit_index > idx && idx > block_index {
296                    min_hit_index = idx;
297                }
298                hit_count += 1;
299            }
300        }
301
302        if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
303        {
304            end_index = min_hit_index;
305        }
306        stats.cache_data_prefetch_count += 1;
307        stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
308        let end_offset = start_offset
309            + sst.meta.block_metas[block_index..end_index]
310                .iter()
311                .map(|meta| meta.len as usize)
312                .sum::<usize>();
313        let data_path = self.get_sst_data_path(object_id);
314        let memory_usage = end_offset - start_offset;
315        let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
316        let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
317        let store = self.store.clone();
318        let join_handle = tokio::spawn(async move {
319            store
320                .read(&data_path, start_offset..end_offset)
321                .instrument_await(span)
322                .await
323        });
324        let buf = match join_handle.await {
325            Ok(Ok(data)) => data,
326            Ok(Err(e)) => {
327                tracing::error!(
328                    "prefetch meet error when read {}..{} from sst-{} ({})",
329                    start_offset,
330                    end_offset,
331                    object_id,
332                    sst.meta.estimated_size,
333                );
334                return Err(e.into());
335            }
336            Err(_) => {
337                return Err(HummockError::other("cancel by other thread"));
338            }
339        };
340        let mut offset = 0;
341        let mut blocks = VecDeque::default();
342        for idx in block_index..end_index {
343            let end = offset + sst.meta.block_metas[idx].len as usize;
344            if end > buf.len() {
345                return Err(ObjectError::internal("read unexpected EOF").into());
346            }
347            // copy again to avoid holding a large data in memory.
348            let block = Block::decode_with_copy(
349                buf.slice(offset..end),
350                sst.meta.block_metas[idx].uncompressed_size as usize,
351                true,
352            )?;
353            let holder = if let CachePolicy::Fill(hint) = policy {
354                let hint = if idx == block_index { hint } else { Hint::Low };
355                let entry = self.block_cache.insert_with_properties(
356                    SstableBlockIndex {
357                        sst_id: object_id,
358                        block_idx: idx as _,
359                    },
360                    Box::new(block),
361                    HybridCacheProperties::default().with_hint(hint),
362                );
363                BlockHolder::from_hybrid_cache_entry(entry)
364            } else {
365                BlockHolder::from_owned_block(Box::new(block))
366            };
367
368            blocks.push_back(holder);
369            offset = end;
370        }
371        Ok(Box::new(PrefetchBlockStream::new(
372            blocks,
373            block_index,
374            Some(tracker),
375        )))
376    }
377
378    pub async fn get_block_response(
379        &self,
380        sst: &Sstable,
381        block_index: usize,
382        policy: CachePolicy,
383        stats: &mut StoreLocalStatistic,
384    ) -> HummockResult<BlockResponse> {
385        let object_id = sst.id;
386        let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
387        let store = self.store.clone();
388
389        stats.cache_data_block_total += 1;
390        let file_size = sst.meta.estimated_size;
391        let data_path = self.get_sst_data_path(object_id);
392
393        let disable_cache: fn() -> bool = || {
394            fail_point!("disable_block_cache", |_| true);
395            false
396        };
397
398        let policy = if disable_cache() {
399            CachePolicy::Disable
400        } else {
401            policy
402        };
403
404        // future: fetch block if hybrid cache miss
405        let fetch_block = move || {
406            let range = range.clone();
407
408            async move {
409                let block_data = match store
410                    .read(&data_path, range.clone())
411                    .instrument_await("get_block_response".verbose())
412                    .await
413                {
414                    Ok(data) => data,
415                    Err(e) => {
416                        tracing::error!(
417                            "get_block_response meet error when read {:?} from sst-{}, total length: {}",
418                            range,
419                            object_id,
420                            file_size
421                        );
422                        return Err(anyhow::Error::from(HummockError::from(e)));
423                    }
424                };
425                let block = Box::new(
426                    Block::decode(block_data, uncompressed_capacity)
427                        .map_err(anyhow::Error::from)?,
428                );
429                Ok(block)
430            }
431        };
432
433        if let Some(filter) = self.recent_filter.as_ref() {
434            filter.extend([(object_id, usize::MAX), (object_id, block_index)]);
435        }
436
437        match policy {
438            CachePolicy::Fill(hint) => {
439                let entry = self.block_cache.fetch_with_properties(
440                    SstableBlockIndex {
441                        sst_id: object_id,
442                        block_idx: block_index as _,
443                    },
444                    HybridCacheProperties::default().with_hint(hint),
445                    fetch_block,
446                );
447                if matches!(entry.state(), FetchState::Miss) {
448                    stats.cache_data_block_miss += 1;
449                }
450                Ok(BlockResponse::Entry(entry))
451            }
452            CachePolicy::NotFill => {
453                match self
454                    .block_cache
455                    .get(&SstableBlockIndex {
456                        sst_id: object_id,
457                        block_idx: block_index as _,
458                    })
459                    .await
460                    .map_err(HummockError::foyer_error)?
461                {
462                    Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
463                        entry,
464                    ))),
465                    _ => {
466                        let block = fetch_block().await.map_err(HummockError::foyer_error)?;
467                        Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
468                    }
469                }
470            }
471            CachePolicy::Disable => {
472                let block = fetch_block().await.map_err(HummockError::foyer_error)?;
473                Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
474            }
475        }
476    }
477
478    pub async fn get(
479        &self,
480        sst: &Sstable,
481        block_index: usize,
482        policy: CachePolicy,
483        stats: &mut StoreLocalStatistic,
484    ) -> HummockResult<BlockHolder> {
485        match self
486            .get_block_response(sst, block_index, policy, stats)
487            .await
488        {
489            Ok(block_response) => block_response.wait().await,
490            Err(err) => Err(err),
491        }
492    }
493
494    pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
495        let object_id = object_id.into();
496        let obj_prefix = self
497            .store
498            .get_object_prefix(object_id.inner(), self.use_new_object_prefix_strategy);
499        risingwave_hummock_sdk::get_object_data_path(
500            &obj_prefix,
501            &self.path,
502            HummockObjectId::Sstable(object_id),
503        )
504    }
505
506    pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
507        risingwave_hummock_sdk::get_object_id_from_path(path)
508    }
509
510    pub fn store(&self) -> ObjectStoreRef {
511        self.store.clone()
512    }
513
514    #[cfg(any(test, feature = "test"))]
515    pub async fn clear_block_cache(&self) -> HummockResult<()> {
516        self.block_cache
517            .clear()
518            .await
519            .map_err(HummockError::foyer_error)
520    }
521
522    #[cfg(any(test, feature = "test"))]
523    pub async fn clear_meta_cache(&self) -> HummockResult<()> {
524        self.meta_cache
525            .clear()
526            .await
527            .map_err(HummockError::foyer_error)
528    }
529
530    pub async fn sstable_cached(
531        &self,
532        sst_obj_id: HummockSstableObjectId,
533    ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
534        self.meta_cache
535            .get(&sst_obj_id)
536            .await
537            .map_err(HummockError::foyer_error)
538    }
539
540    /// Returns `table_holder`
541    pub fn sstable(
542        &self,
543        sstable_info_ref: &SstableInfo,
544        stats: &mut StoreLocalStatistic,
545    ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
546        let object_id = sstable_info_ref.object_id;
547
548        let entry = self.meta_cache.fetch(object_id, || {
549            let store = self.store.clone();
550            let meta_path = self.get_sst_data_path(object_id);
551            let stats_ptr = stats.remote_io_time.clone();
552            let range = sstable_info_ref.meta_offset as usize..;
553            async move {
554                let now = Instant::now();
555                let buf = store.read(&meta_path, range).await?;
556                let meta = SstableMeta::decode(&buf[..])?;
557
558                let sst = Sstable::new(object_id, meta);
559                let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
560                stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
561                Ok(Box::new(sst))
562            }
563        });
564
565        if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
566            stats.cache_meta_block_miss += 1;
567        }
568
569        stats.cache_meta_block_total += 1;
570
571        async move { entry.await.map_err(HummockError::foyer_error) }
572    }
573
574    pub async fn list_sst_object_metadata_from_object_store(
575        &self,
576        prefix: Option<String>,
577        start_after: Option<String>,
578        limit: Option<usize>,
579    ) -> HummockResult<ObjectMetadataIter> {
580        let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
581        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
582        let iter = raw_iter.filter(|r| match r {
583            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
584            Err(_) => future::ready(true),
585        });
586        Ok(Box::pin(iter))
587    }
588
589    pub fn create_sst_writer(
590        self: Arc<Self>,
591        object_id: impl Into<HummockSstableObjectId>,
592        options: SstableWriterOptions,
593    ) -> BatchUploadWriter {
594        BatchUploadWriter::new(object_id, self, options)
595    }
596
597    pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
598        let sst = Sstable::new(object_id, meta);
599        self.meta_cache.insert(object_id, Box::new(sst));
600    }
601
602    pub fn insert_block_cache(
603        &self,
604        object_id: HummockSstableObjectId,
605        block_index: u64,
606        block: Box<Block>,
607    ) {
608        self.block_cache.insert(
609            SstableBlockIndex {
610                sst_id: object_id,
611                block_idx: block_index,
612            },
613            block,
614        );
615    }
616
617    pub fn get_prefetch_memory_usage(&self) -> usize {
618        self.prefetch_buffer_usage.load(Ordering::Acquire)
619    }
620
621    pub async fn get_stream_for_blocks(
622        &self,
623        object_id: HummockSstableObjectId,
624        metas: &[BlockMeta],
625    ) -> HummockResult<BlockDataStream> {
626        fail_point!("get_stream_err");
627        let data_path = self.get_sst_data_path(object_id);
628        let store = self.store().clone();
629        let block_meta = &metas[0];
630        let start_pos = block_meta.offset as usize;
631        let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
632        let range = start_pos..end_pos;
633        // spawn to tokio pool because the object-storage sdk may not be safe to cancel.
634        let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
635
636        let reader = match ret {
637            Ok(Ok(reader)) => reader,
638            Ok(Err(e)) => return Err(HummockError::from(e)),
639            Err(e) => {
640                return Err(HummockError::other(format!(
641                    "failed to get result, this read request may be canceled: {}",
642                    e.as_report()
643                )));
644            }
645        };
646        Ok(BlockDataStream::new(reader, metas.to_vec()))
647    }
648
649    pub fn data_recent_filter(
650        &self,
651    ) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
652        self.recent_filter.as_ref()
653    }
654
655    pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
656        &self.meta_cache
657    }
658
659    pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
660        &self.block_cache
661    }
662
663    pub fn recent_filter(&self) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
664        self.recent_filter.as_ref()
665    }
666
667    pub async fn create_streaming_uploader(
668        &self,
669        path: &str,
670    ) -> ObjectResult<ObjectStreamingUploader> {
671        self.store.streaming_upload(path).await
672    }
673}
674
675pub type SstableStoreRef = Arc<SstableStore>;
676#[cfg(test)]
677mod tests {
678    use std::ops::Range;
679    use std::sync::Arc;
680
681    use risingwave_hummock_sdk::HummockObjectId;
682    use risingwave_hummock_sdk::sstable_info::SstableInfo;
683
684    use super::{SstableStoreRef, SstableWriterOptions};
685    use crate::hummock::iterator::HummockIterator;
686    use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
687    use crate::hummock::sstable::SstableIteratorReadOptions;
688    use crate::hummock::test_utils::{
689        default_builder_opt_for_test, gen_test_sstable_data, put_sst,
690    };
691    use crate::hummock::value::HummockValue;
692    use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
693    use crate::monitor::StoreLocalStatistic;
694
695    const SST_ID: u64 = 1;
696
697    fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
698        HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
699    }
700
701    async fn validate_sst(
702        sstable_store: SstableStoreRef,
703        info: &SstableInfo,
704        mut meta: SstableMeta,
705        x_range: Range<usize>,
706    ) {
707        let mut stats = StoreLocalStatistic::default();
708        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
709        std::mem::take(&mut meta.bloom_filter);
710        assert_eq!(holder.meta, meta);
711        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
712        assert_eq!(holder.meta, meta);
713        let mut iter = SstableIterator::new(
714            holder,
715            sstable_store,
716            Arc::new(SstableIteratorReadOptions::default()),
717            info,
718        );
719        iter.rewind().await.unwrap();
720        for i in x_range {
721            let key = iter.key();
722            let value = iter.value();
723            assert_eq!(key, iterator_test_key_of(i).to_ref());
724            assert_eq!(value, get_hummock_value(i).as_slice());
725            iter.next().await.unwrap();
726        }
727    }
728
729    #[tokio::test]
730    async fn test_batch_upload() {
731        let sstable_store = mock_sstable_store().await;
732        let x_range = 0..100;
733        let (data, meta) = gen_test_sstable_data(
734            default_builder_opt_for_test(),
735            x_range
736                .clone()
737                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
738        )
739        .await;
740        let writer_opts = SstableWriterOptions {
741            capacity_hint: None,
742            tracker: None,
743            policy: CachePolicy::Disable,
744        };
745        let info = put_sst(
746            SST_ID,
747            data.clone(),
748            meta.clone(),
749            sstable_store.clone(),
750            writer_opts,
751            vec![0],
752        )
753        .await
754        .unwrap();
755
756        validate_sst(sstable_store, &info, meta, x_range).await;
757    }
758
759    #[tokio::test]
760    async fn test_streaming_upload() {
761        // Generate test data.
762        let sstable_store = mock_sstable_store().await;
763        let x_range = 0..100;
764        let (data, meta) = gen_test_sstable_data(
765            default_builder_opt_for_test(),
766            x_range
767                .clone()
768                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
769        )
770        .await;
771        let writer_opts = SstableWriterOptions {
772            capacity_hint: None,
773            tracker: None,
774            policy: CachePolicy::Disable,
775        };
776        let info = put_sst(
777            SST_ID,
778            data.clone(),
779            meta.clone(),
780            sstable_store.clone(),
781            writer_opts,
782            vec![0],
783        )
784        .await
785        .unwrap();
786
787        validate_sst(sstable_store, &info, meta, x_range).await;
788    }
789
790    #[tokio::test]
791    async fn test_basic() {
792        let sstable_store = mock_sstable_store().await;
793        let object_id = 123;
794        let data_path = sstable_store.get_sst_data_path(object_id);
795        assert_eq!(data_path, "test/123.data");
796        assert_eq!(
797            SstableStore::get_object_id_from_path(&data_path),
798            HummockObjectId::Sstable(object_id.into())
799        );
800    }
801}