risingwave_storage/hummock/
sstable_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::clone::Clone;
15use std::collections::VecDeque;
16use std::future::Future;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use fail::fail_point;
23use foyer::{
24    CacheHint, Engine, EventListener, FetchState, HybridCache, HybridCacheBuilder, HybridCacheEntry,
25};
26use futures::{StreamExt, future};
27use risingwave_hummock_sdk::sstable_info::SstableInfo;
28use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX};
29use risingwave_hummock_trace::TracedCachePolicy;
30use risingwave_object_store::object::{
31    ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
32};
33use serde::{Deserialize, Serialize};
34use thiserror_ext::AsReport;
35use tokio::time::Instant;
36
37use super::{
38    BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
39    SstableWriterOptions,
40};
41use crate::hummock::block_stream::{
42    BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
43};
44use crate::hummock::{BlockHolder, HummockError, HummockResult};
45use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
46
47pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
48
49#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
50pub struct SstableBlockIndex {
51    pub sst_id: HummockSstableObjectId,
52    pub block_idx: u64,
53}
54
55pub struct BlockCacheEventListener {
56    metrics: Arc<HummockStateStoreMetrics>,
57}
58
59impl BlockCacheEventListener {
60    pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
61        Self { metrics }
62    }
63}
64
65impl EventListener for BlockCacheEventListener {
66    type Key = SstableBlockIndex;
67    type Value = Box<Block>;
68
69    fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
70    where
71        Self::Key: foyer::Key,
72        Self::Value: foyer::Value,
73    {
74        self.metrics
75            .block_efficiency_histogram
76            .observe(value.efficiency());
77    }
78}
79
80// TODO: Define policy based on use cases (read / compaction / ...).
81#[derive(Clone, Copy, Eq, PartialEq)]
82pub enum CachePolicy {
83    /// Disable read cache and not fill the cache afterwards.
84    Disable,
85    /// Try reading the cache and fill the cache afterwards.
86    Fill(CacheHint),
87    /// Read the cache but not fill the cache afterwards.
88    NotFill,
89}
90
91impl Default for CachePolicy {
92    fn default() -> Self {
93        CachePolicy::Fill(CacheHint::Normal)
94    }
95}
96
97impl From<TracedCachePolicy> for CachePolicy {
98    fn from(policy: TracedCachePolicy) -> Self {
99        match policy {
100            TracedCachePolicy::Disable => Self::Disable,
101            TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
102            TracedCachePolicy::NotFill => Self::NotFill,
103        }
104    }
105}
106
107impl From<CachePolicy> for TracedCachePolicy {
108    fn from(policy: CachePolicy) -> Self {
109        match policy {
110            CachePolicy::Disable => Self::Disable,
111            CachePolicy::Fill(priority) => Self::Fill(priority.into()),
112            CachePolicy::NotFill => Self::NotFill,
113        }
114    }
115}
116
117pub struct SstableStoreConfig {
118    pub store: ObjectStoreRef,
119    pub path: String,
120
121    pub prefetch_buffer_capacity: usize,
122    pub max_prefetch_block_number: usize,
123    pub recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
124    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
125    pub use_new_object_prefix_strategy: bool,
126
127    pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
128    pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
129}
130
131pub struct SstableStore {
132    path: String,
133    store: ObjectStoreRef,
134
135    meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
136    block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
137
138    /// Recent filter for `(sst_obj_id, blk_idx)`.
139    ///
140    /// `blk_idx == USIZE::MAX` stands for `sst_obj_id` only entry.
141    recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
142    prefetch_buffer_usage: Arc<AtomicUsize>,
143    prefetch_buffer_capacity: usize,
144    max_prefetch_block_number: usize,
145    /// Whether the object store is divided into prefixes depends on two factors:
146    ///   1. The specific object store type.
147    ///   2. Whether the existing cluster is a new cluster.
148    ///
149    /// The value of `use_new_object_prefix_strategy` is determined by the `use_new_object_prefix_strategy` field in the system parameters.
150    /// For a new cluster, `use_new_object_prefix_strategy` is set to True.
151    /// For an old cluster, `use_new_object_prefix_strategy` is set to False.
152    /// The final decision of whether to divide prefixes is based on this field and the specific object store type, this approach is implemented to ensure backward compatibility.
153    use_new_object_prefix_strategy: bool,
154}
155
156impl SstableStore {
157    pub fn new(config: SstableStoreConfig) -> Self {
158        // TODO: We should validate path early. Otherwise object store won't report invalid path
159        // error until first write attempt.
160
161        Self {
162            path: config.path,
163            store: config.store,
164
165            meta_cache: config.meta_cache,
166            block_cache: config.block_cache,
167
168            recent_filter: config.recent_filter,
169            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
170            prefetch_buffer_capacity: config.prefetch_buffer_capacity,
171            max_prefetch_block_number: config.max_prefetch_block_number,
172            use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
173        }
174    }
175
176    /// For compactor, we do not need a high concurrency load for cache. Instead, we need the cache
177    ///  can be evict more effective.
178    #[expect(clippy::borrowed_box)]
179    pub async fn for_compactor(
180        store: ObjectStoreRef,
181        path: String,
182        block_cache_capacity: usize,
183        meta_cache_capacity: usize,
184        use_new_object_prefix_strategy: bool,
185    ) -> HummockResult<Self> {
186        let meta_cache = HybridCacheBuilder::new()
187            .memory(meta_cache_capacity)
188            .with_shards(1)
189            .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
190                u64::BITS as usize / 8 + value.estimate_size()
191            })
192            .storage(Engine::Large)
193            .build()
194            .await
195            .map_err(HummockError::foyer_error)?;
196
197        let block_cache = HybridCacheBuilder::new()
198            .memory(block_cache_capacity)
199            .with_shards(1)
200            .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
201                // FIXME(MrCroxx): Calculate block weight more accurately.
202                u64::BITS as usize * 2 / 8 + value.raw().len()
203            })
204            .storage(Engine::Large)
205            .build()
206            .await
207            .map_err(HummockError::foyer_error)?;
208
209        Ok(Self {
210            path,
211            store,
212
213            prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
214            prefetch_buffer_capacity: block_cache_capacity,
215            max_prefetch_block_number: 16, /* compactor won't use this parameter, so just assign a default value. */
216            recent_filter: None,
217            use_new_object_prefix_strategy,
218
219            meta_cache,
220            block_cache,
221        })
222    }
223
224    pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
225        self.store
226            .delete(self.get_sst_data_path(object_id).as_str())
227            .await?;
228        self.meta_cache.remove(&object_id);
229        // TODO(MrCroxx): support group remove in foyer.
230        Ok(())
231    }
232
233    pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
234        self.meta_cache.remove(&object_id);
235        Ok(())
236    }
237
238    pub(crate) async fn put_sst_data(
239        &self,
240        object_id: HummockSstableObjectId,
241        data: Bytes,
242    ) -> HummockResult<()> {
243        let data_path = self.get_sst_data_path(object_id);
244        self.store
245            .upload(&data_path, data)
246            .await
247            .map_err(Into::into)
248    }
249
250    pub async fn prefetch_blocks(
251        &self,
252        sst: &Sstable,
253        block_index: usize,
254        end_index: usize,
255        policy: CachePolicy,
256        stats: &mut StoreLocalStatistic,
257    ) -> HummockResult<Box<dyn BlockStream>> {
258        let object_id = sst.id;
259        if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
260            let block = self.get(sst, block_index, policy, stats).await?;
261            return Ok(Box::new(PrefetchBlockStream::new(
262                VecDeque::from([block]),
263                block_index,
264                None,
265            )));
266        }
267        stats.cache_data_block_total += 1;
268        if let Some(entry) = self
269            .block_cache
270            .get(&SstableBlockIndex {
271                sst_id: object_id,
272                block_idx: block_index as _,
273            })
274            .await
275            .map_err(HummockError::foyer_error)?
276        {
277            let block = BlockHolder::from_hybrid_cache_entry(entry);
278            return Ok(Box::new(PrefetchBlockStream::new(
279                VecDeque::from([block]),
280                block_index,
281                None,
282            )));
283        }
284        let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
285        let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
286        let start_offset = sst.meta.block_metas[block_index].offset as usize;
287        let mut min_hit_index = end_index;
288        let mut hit_count = 0;
289        for idx in block_index..end_index {
290            if self.block_cache.contains(&SstableBlockIndex {
291                sst_id: object_id,
292                block_idx: idx as _,
293            }) {
294                if min_hit_index > idx && idx > block_index {
295                    min_hit_index = idx;
296                }
297                hit_count += 1;
298            }
299        }
300
301        if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
302        {
303            end_index = min_hit_index;
304        }
305        stats.cache_data_prefetch_count += 1;
306        stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
307        let end_offset = start_offset
308            + sst.meta.block_metas[block_index..end_index]
309                .iter()
310                .map(|meta| meta.len as usize)
311                .sum::<usize>();
312        let data_path = self.get_sst_data_path(object_id);
313        let memory_usage = end_offset - start_offset;
314        let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
315        let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
316        let store = self.store.clone();
317        let join_handle = tokio::spawn(async move {
318            store
319                .read(&data_path, start_offset..end_offset)
320                .instrument_await(span)
321                .await
322        });
323        let buf = match join_handle.await {
324            Ok(Ok(data)) => data,
325            Ok(Err(e)) => {
326                tracing::error!(
327                    "prefetch meet error when read {}..{} from sst-{} ({})",
328                    start_offset,
329                    end_offset,
330                    object_id,
331                    sst.meta.estimated_size,
332                );
333                return Err(e.into());
334            }
335            Err(_) => {
336                return Err(HummockError::other("cancel by other thread"));
337            }
338        };
339        let mut offset = 0;
340        let mut blocks = VecDeque::default();
341        for idx in block_index..end_index {
342            let end = offset + sst.meta.block_metas[idx].len as usize;
343            if end > buf.len() {
344                return Err(ObjectError::internal("read unexpected EOF").into());
345            }
346            // copy again to avoid holding a large data in memory.
347            let block = Block::decode_with_copy(
348                buf.slice(offset..end),
349                sst.meta.block_metas[idx].uncompressed_size as usize,
350                true,
351            )?;
352            let holder = if let CachePolicy::Fill(priority) = policy {
353                let cache_priority = if idx == block_index {
354                    priority
355                } else {
356                    CacheHint::Low
357                };
358                let entry = self.block_cache.insert_with_hint(
359                    SstableBlockIndex {
360                        sst_id: object_id,
361                        block_idx: idx as _,
362                    },
363                    Box::new(block),
364                    cache_priority,
365                );
366                BlockHolder::from_hybrid_cache_entry(entry)
367            } else {
368                BlockHolder::from_owned_block(Box::new(block))
369            };
370
371            blocks.push_back(holder);
372            offset = end;
373        }
374        Ok(Box::new(PrefetchBlockStream::new(
375            blocks,
376            block_index,
377            Some(tracker),
378        )))
379    }
380
381    pub async fn get_block_response(
382        &self,
383        sst: &Sstable,
384        block_index: usize,
385        policy: CachePolicy,
386        stats: &mut StoreLocalStatistic,
387    ) -> HummockResult<BlockResponse> {
388        let object_id = sst.id;
389        let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
390        let store = self.store.clone();
391
392        stats.cache_data_block_total += 1;
393        let file_size = sst.meta.estimated_size;
394        let data_path = self.get_sst_data_path(object_id);
395
396        let disable_cache: fn() -> bool = || {
397            fail_point!("disable_block_cache", |_| true);
398            false
399        };
400
401        let policy = if disable_cache() {
402            CachePolicy::Disable
403        } else {
404            policy
405        };
406
407        // future: fetch block if hybrid cache miss
408        let fetch_block = move || {
409            let range = range.clone();
410
411            async move {
412                let block_data = match store
413                    .read(&data_path, range.clone())
414                    .instrument_await("get_block_response".verbose())
415                    .await
416                {
417                    Ok(data) => data,
418                    Err(e) => {
419                        tracing::error!(
420                            "get_block_response meet error when read {:?} from sst-{}, total length: {}",
421                            range,
422                            object_id,
423                            file_size
424                        );
425                        return Err(anyhow::Error::from(HummockError::from(e)));
426                    }
427                };
428                let block = Box::new(
429                    Block::decode(block_data, uncompressed_capacity)
430                        .map_err(anyhow::Error::from)?,
431                );
432                Ok(block)
433            }
434        };
435
436        if let Some(filter) = self.recent_filter.as_ref() {
437            filter.extend([(object_id, usize::MAX), (object_id, block_index)]);
438        }
439
440        match policy {
441            CachePolicy::Fill(context) => {
442                let entry = self.block_cache.fetch_with_hint(
443                    SstableBlockIndex {
444                        sst_id: object_id,
445                        block_idx: block_index as _,
446                    },
447                    context,
448                    fetch_block,
449                );
450                if matches!(entry.state(), FetchState::Miss) {
451                    stats.cache_data_block_miss += 1;
452                }
453                Ok(BlockResponse::Entry(entry))
454            }
455            CachePolicy::NotFill => {
456                match self
457                    .block_cache
458                    .get(&SstableBlockIndex {
459                        sst_id: object_id,
460                        block_idx: block_index as _,
461                    })
462                    .await
463                    .map_err(HummockError::foyer_error)?
464                {
465                    Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
466                        entry,
467                    ))),
468                    _ => {
469                        let block = fetch_block().await.map_err(HummockError::foyer_error)?;
470                        Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
471                    }
472                }
473            }
474            CachePolicy::Disable => {
475                let block = fetch_block().await.map_err(HummockError::foyer_error)?;
476                Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
477            }
478        }
479    }
480
481    pub async fn get(
482        &self,
483        sst: &Sstable,
484        block_index: usize,
485        policy: CachePolicy,
486        stats: &mut StoreLocalStatistic,
487    ) -> HummockResult<BlockHolder> {
488        match self
489            .get_block_response(sst, block_index, policy, stats)
490            .await
491        {
492            Ok(block_response) => block_response.wait().await,
493            Err(err) => Err(err),
494        }
495    }
496
497    pub fn get_sst_data_path(&self, object_id: HummockSstableObjectId) -> String {
498        let obj_prefix = self
499            .store
500            .get_object_prefix(object_id, self.use_new_object_prefix_strategy);
501        risingwave_hummock_sdk::get_sst_data_path(&obj_prefix, &self.path, object_id)
502    }
503
504    pub fn get_object_id_from_path(path: &str) -> HummockSstableObjectId {
505        risingwave_hummock_sdk::get_object_id_from_path(path)
506    }
507
508    pub fn store(&self) -> ObjectStoreRef {
509        self.store.clone()
510    }
511
512    #[cfg(any(test, feature = "test"))]
513    pub async fn clear_block_cache(&self) -> HummockResult<()> {
514        self.block_cache
515            .clear()
516            .await
517            .map_err(HummockError::foyer_error)
518    }
519
520    #[cfg(any(test, feature = "test"))]
521    pub async fn clear_meta_cache(&self) -> HummockResult<()> {
522        self.meta_cache
523            .clear()
524            .await
525            .map_err(HummockError::foyer_error)
526    }
527
528    pub async fn sstable_cached(
529        &self,
530        sst_obj_id: HummockSstableObjectId,
531    ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
532        self.meta_cache
533            .get(&sst_obj_id)
534            .await
535            .map_err(HummockError::foyer_error)
536    }
537
538    /// Returns `table_holder`
539    pub fn sstable(
540        &self,
541        sstable_info_ref: &SstableInfo,
542        stats: &mut StoreLocalStatistic,
543    ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
544        let object_id = sstable_info_ref.object_id;
545
546        let entry = self.meta_cache.fetch(object_id, || {
547            let store = self.store.clone();
548            let meta_path = self.get_sst_data_path(object_id);
549            let stats_ptr = stats.remote_io_time.clone();
550            let range = sstable_info_ref.meta_offset as usize..;
551            async move {
552                let now = Instant::now();
553                let buf = store.read(&meta_path, range).await?;
554                let meta = SstableMeta::decode(&buf[..])?;
555
556                let sst = Sstable::new(object_id, meta);
557                let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
558                stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
559                Ok(Box::new(sst))
560            }
561        });
562
563        if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
564            stats.cache_meta_block_miss += 1;
565        }
566
567        stats.cache_meta_block_total += 1;
568
569        async move { entry.await.map_err(HummockError::foyer_error) }
570    }
571
572    pub async fn list_object_metadata_from_object_store(
573        &self,
574        prefix: Option<String>,
575        start_after: Option<String>,
576        limit: Option<usize>,
577    ) -> HummockResult<ObjectMetadataIter> {
578        let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
579        let raw_iter = self.store.list(&list_path, start_after, limit).await?;
580        let iter = raw_iter.filter(|r| match r {
581            Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))),
582            Err(_) => future::ready(true),
583        });
584        Ok(Box::pin(iter))
585    }
586
587    pub fn create_sst_writer(
588        self: Arc<Self>,
589        object_id: HummockSstableObjectId,
590        options: SstableWriterOptions,
591    ) -> BatchUploadWriter {
592        BatchUploadWriter::new(object_id, self, options)
593    }
594
595    pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
596        let sst = Sstable::new(object_id, meta);
597        self.meta_cache.insert(object_id, Box::new(sst));
598    }
599
600    pub fn insert_block_cache(
601        &self,
602        object_id: HummockSstableObjectId,
603        block_index: u64,
604        block: Box<Block>,
605    ) {
606        self.block_cache.insert(
607            SstableBlockIndex {
608                sst_id: object_id,
609                block_idx: block_index,
610            },
611            block,
612        );
613    }
614
615    pub fn get_prefetch_memory_usage(&self) -> usize {
616        self.prefetch_buffer_usage.load(Ordering::Acquire)
617    }
618
619    pub async fn get_stream_for_blocks(
620        &self,
621        object_id: HummockSstableObjectId,
622        metas: &[BlockMeta],
623    ) -> HummockResult<BlockDataStream> {
624        fail_point!("get_stream_err");
625        let data_path = self.get_sst_data_path(object_id);
626        let store = self.store().clone();
627        let block_meta = &metas[0];
628        let start_pos = block_meta.offset as usize;
629        let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
630        let range = start_pos..end_pos;
631        // spawn to tokio pool because the object-storage sdk may not be safe to cancel.
632        let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
633
634        let reader = match ret {
635            Ok(Ok(reader)) => reader,
636            Ok(Err(e)) => return Err(HummockError::from(e)),
637            Err(e) => {
638                return Err(HummockError::other(format!(
639                    "failed to get result, this read request may be canceled: {}",
640                    e.as_report()
641                )));
642            }
643        };
644        Ok(BlockDataStream::new(reader, metas.to_vec()))
645    }
646
647    pub fn data_recent_filter(
648        &self,
649    ) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
650        self.recent_filter.as_ref()
651    }
652
653    pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
654        &self.meta_cache
655    }
656
657    pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
658        &self.block_cache
659    }
660
661    pub fn recent_filter(&self) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
662        self.recent_filter.as_ref()
663    }
664
665    pub async fn create_streaming_uploader(
666        &self,
667        path: &str,
668    ) -> ObjectResult<ObjectStreamingUploader> {
669        self.store.streaming_upload(path).await
670    }
671}
672
673pub type SstableStoreRef = Arc<SstableStore>;
674#[cfg(test)]
675mod tests {
676    use std::ops::Range;
677    use std::sync::Arc;
678
679    use risingwave_hummock_sdk::HummockSstableObjectId;
680    use risingwave_hummock_sdk::sstable_info::SstableInfo;
681
682    use super::{SstableStoreRef, SstableWriterOptions};
683    use crate::hummock::iterator::HummockIterator;
684    use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
685    use crate::hummock::sstable::SstableIteratorReadOptions;
686    use crate::hummock::test_utils::{
687        default_builder_opt_for_test, gen_test_sstable_data, put_sst,
688    };
689    use crate::hummock::value::HummockValue;
690    use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
691    use crate::monitor::StoreLocalStatistic;
692
693    const SST_ID: HummockSstableObjectId = 1;
694
695    fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
696        HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
697    }
698
699    async fn validate_sst(
700        sstable_store: SstableStoreRef,
701        info: &SstableInfo,
702        mut meta: SstableMeta,
703        x_range: Range<usize>,
704    ) {
705        let mut stats = StoreLocalStatistic::default();
706        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
707        std::mem::take(&mut meta.bloom_filter);
708        assert_eq!(holder.meta, meta);
709        let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
710        assert_eq!(holder.meta, meta);
711        let mut iter = SstableIterator::new(
712            holder,
713            sstable_store,
714            Arc::new(SstableIteratorReadOptions::default()),
715            info,
716        );
717        iter.rewind().await.unwrap();
718        for i in x_range {
719            let key = iter.key();
720            let value = iter.value();
721            assert_eq!(key, iterator_test_key_of(i).to_ref());
722            assert_eq!(value, get_hummock_value(i).as_slice());
723            iter.next().await.unwrap();
724        }
725    }
726
727    #[tokio::test]
728    async fn test_batch_upload() {
729        let sstable_store = mock_sstable_store().await;
730        let x_range = 0..100;
731        let (data, meta) = gen_test_sstable_data(
732            default_builder_opt_for_test(),
733            x_range
734                .clone()
735                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
736        )
737        .await;
738        let writer_opts = SstableWriterOptions {
739            capacity_hint: None,
740            tracker: None,
741            policy: CachePolicy::Disable,
742        };
743        let info = put_sst(
744            SST_ID,
745            data.clone(),
746            meta.clone(),
747            sstable_store.clone(),
748            writer_opts,
749            vec![0],
750        )
751        .await
752        .unwrap();
753
754        validate_sst(sstable_store, &info, meta, x_range).await;
755    }
756
757    #[tokio::test]
758    async fn test_streaming_upload() {
759        // Generate test data.
760        let sstable_store = mock_sstable_store().await;
761        let x_range = 0..100;
762        let (data, meta) = gen_test_sstable_data(
763            default_builder_opt_for_test(),
764            x_range
765                .clone()
766                .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
767        )
768        .await;
769        let writer_opts = SstableWriterOptions {
770            capacity_hint: None,
771            tracker: None,
772            policy: CachePolicy::Disable,
773        };
774        let info = put_sst(
775            SST_ID,
776            data.clone(),
777            meta.clone(),
778            sstable_store.clone(),
779            writer_opts,
780            vec![0],
781        )
782        .await
783        .unwrap();
784
785        validate_sst(sstable_store, &info, meta, x_range).await;
786    }
787
788    #[tokio::test]
789    async fn test_basic() {
790        let sstable_store = mock_sstable_store().await;
791        let object_id = 123;
792        let data_path = sstable_store.get_sst_data_path(object_id);
793        assert_eq!(data_path, "test/123.data");
794        assert_eq!(SstableStore::get_object_id_from_path(&data_path), object_id);
795    }
796}