1use std::clone::Clone;
15use std::collections::VecDeque;
16use std::future::Future;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use fail::fail_point;
23use foyer::{
24 Engine, EventListener, FetchState, Hint, HybridCache, HybridCacheBuilder, HybridCacheEntry,
25 HybridCacheProperties,
26};
27use futures::{StreamExt, future};
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::{HummockObjectId, HummockSstableObjectId, SST_OBJECT_SUFFIX};
30use risingwave_hummock_trace::TracedCachePolicy;
31use risingwave_object_store::object::{
32 ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
33};
34use serde::{Deserialize, Serialize};
35use thiserror_ext::AsReport;
36use tokio::time::Instant;
37
38use super::{
39 BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
40 SstableWriterOptions,
41};
42use crate::hummock::block_stream::{
43 BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
44};
45use crate::hummock::{BlockHolder, HummockError, HummockResult};
46use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
47
48pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
49
50#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
51pub struct SstableBlockIndex {
52 pub sst_id: HummockSstableObjectId,
53 pub block_idx: u64,
54}
55
56pub struct BlockCacheEventListener {
57 metrics: Arc<HummockStateStoreMetrics>,
58}
59
60impl BlockCacheEventListener {
61 pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
62 Self { metrics }
63 }
64}
65
66impl EventListener for BlockCacheEventListener {
67 type Key = SstableBlockIndex;
68 type Value = Box<Block>;
69
70 fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
71 where
72 Self::Key: foyer::Key,
73 Self::Value: foyer::Value,
74 {
75 self.metrics
76 .block_efficiency_histogram
77 .observe(value.efficiency());
78 }
79}
80
81#[derive(Clone, Copy, Eq, PartialEq)]
83pub enum CachePolicy {
84 Disable,
86 Fill(Hint),
88 NotFill,
90}
91
92impl Default for CachePolicy {
93 fn default() -> Self {
94 CachePolicy::Fill(Hint::Normal)
95 }
96}
97
98impl From<TracedCachePolicy> for CachePolicy {
99 fn from(policy: TracedCachePolicy) -> Self {
100 match policy {
101 TracedCachePolicy::Disable => Self::Disable,
102 TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
103 TracedCachePolicy::NotFill => Self::NotFill,
104 }
105 }
106}
107
108impl From<CachePolicy> for TracedCachePolicy {
109 fn from(policy: CachePolicy) -> Self {
110 match policy {
111 CachePolicy::Disable => Self::Disable,
112 CachePolicy::Fill(priority) => Self::Fill(priority.into()),
113 CachePolicy::NotFill => Self::NotFill,
114 }
115 }
116}
117
118pub struct SstableStoreConfig {
119 pub store: ObjectStoreRef,
120 pub path: String,
121
122 pub prefetch_buffer_capacity: usize,
123 pub max_prefetch_block_number: usize,
124 pub recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
125 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
126 pub use_new_object_prefix_strategy: bool,
127
128 pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
129 pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
130}
131
132pub struct SstableStore {
133 path: String,
134 store: ObjectStoreRef,
135
136 meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
137 block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
138
139 recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
143 prefetch_buffer_usage: Arc<AtomicUsize>,
144 prefetch_buffer_capacity: usize,
145 max_prefetch_block_number: usize,
146 use_new_object_prefix_strategy: bool,
155}
156
157impl SstableStore {
158 pub fn new(config: SstableStoreConfig) -> Self {
159 Self {
163 path: config.path,
164 store: config.store,
165
166 meta_cache: config.meta_cache,
167 block_cache: config.block_cache,
168
169 recent_filter: config.recent_filter,
170 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
171 prefetch_buffer_capacity: config.prefetch_buffer_capacity,
172 max_prefetch_block_number: config.max_prefetch_block_number,
173 use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
174 }
175 }
176
177 #[expect(clippy::borrowed_box)]
180 pub async fn for_compactor(
181 store: ObjectStoreRef,
182 path: String,
183 block_cache_capacity: usize,
184 meta_cache_capacity: usize,
185 use_new_object_prefix_strategy: bool,
186 ) -> HummockResult<Self> {
187 let meta_cache = HybridCacheBuilder::new()
188 .memory(meta_cache_capacity)
189 .with_shards(1)
190 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
191 u64::BITS as usize / 8 + value.estimate_size()
192 })
193 .storage(Engine::Large)
194 .build()
195 .await
196 .map_err(HummockError::foyer_error)?;
197
198 let block_cache = HybridCacheBuilder::new()
199 .memory(block_cache_capacity)
200 .with_shards(1)
201 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
202 u64::BITS as usize * 2 / 8 + value.raw().len()
204 })
205 .storage(Engine::Large)
206 .build()
207 .await
208 .map_err(HummockError::foyer_error)?;
209
210 Ok(Self {
211 path,
212 store,
213
214 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
215 prefetch_buffer_capacity: block_cache_capacity,
216 max_prefetch_block_number: 16, recent_filter: None,
218 use_new_object_prefix_strategy,
219
220 meta_cache,
221 block_cache,
222 })
223 }
224
225 pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
226 self.store
227 .delete(self.get_sst_data_path(object_id).as_str())
228 .await?;
229 self.meta_cache.remove(&object_id);
230 Ok(())
232 }
233
234 pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
235 self.meta_cache.remove(&object_id);
236 Ok(())
237 }
238
239 pub(crate) async fn put_sst_data(
240 &self,
241 object_id: HummockSstableObjectId,
242 data: Bytes,
243 ) -> HummockResult<()> {
244 let data_path = self.get_sst_data_path(object_id);
245 self.store
246 .upload(&data_path, data)
247 .await
248 .map_err(Into::into)
249 }
250
251 pub async fn prefetch_blocks(
252 &self,
253 sst: &Sstable,
254 block_index: usize,
255 end_index: usize,
256 policy: CachePolicy,
257 stats: &mut StoreLocalStatistic,
258 ) -> HummockResult<Box<dyn BlockStream>> {
259 let object_id = sst.id;
260 if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
261 let block = self.get(sst, block_index, policy, stats).await?;
262 return Ok(Box::new(PrefetchBlockStream::new(
263 VecDeque::from([block]),
264 block_index,
265 None,
266 )));
267 }
268 stats.cache_data_block_total += 1;
269 if let Some(entry) = self
270 .block_cache
271 .get(&SstableBlockIndex {
272 sst_id: object_id,
273 block_idx: block_index as _,
274 })
275 .await
276 .map_err(HummockError::foyer_error)?
277 {
278 let block = BlockHolder::from_hybrid_cache_entry(entry);
279 return Ok(Box::new(PrefetchBlockStream::new(
280 VecDeque::from([block]),
281 block_index,
282 None,
283 )));
284 }
285 let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
286 let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
287 let start_offset = sst.meta.block_metas[block_index].offset as usize;
288 let mut min_hit_index = end_index;
289 let mut hit_count = 0;
290 for idx in block_index..end_index {
291 if self.block_cache.contains(&SstableBlockIndex {
292 sst_id: object_id,
293 block_idx: idx as _,
294 }) {
295 if min_hit_index > idx && idx > block_index {
296 min_hit_index = idx;
297 }
298 hit_count += 1;
299 }
300 }
301
302 if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
303 {
304 end_index = min_hit_index;
305 }
306 stats.cache_data_prefetch_count += 1;
307 stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
308 let end_offset = start_offset
309 + sst.meta.block_metas[block_index..end_index]
310 .iter()
311 .map(|meta| meta.len as usize)
312 .sum::<usize>();
313 let data_path = self.get_sst_data_path(object_id);
314 let memory_usage = end_offset - start_offset;
315 let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
316 let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
317 let store = self.store.clone();
318 let join_handle = tokio::spawn(async move {
319 store
320 .read(&data_path, start_offset..end_offset)
321 .instrument_await(span)
322 .await
323 });
324 let buf = match join_handle.await {
325 Ok(Ok(data)) => data,
326 Ok(Err(e)) => {
327 tracing::error!(
328 "prefetch meet error when read {}..{} from sst-{} ({})",
329 start_offset,
330 end_offset,
331 object_id,
332 sst.meta.estimated_size,
333 );
334 return Err(e.into());
335 }
336 Err(_) => {
337 return Err(HummockError::other("cancel by other thread"));
338 }
339 };
340 let mut offset = 0;
341 let mut blocks = VecDeque::default();
342 for idx in block_index..end_index {
343 let end = offset + sst.meta.block_metas[idx].len as usize;
344 if end > buf.len() {
345 return Err(ObjectError::internal("read unexpected EOF").into());
346 }
347 let block = Block::decode_with_copy(
349 buf.slice(offset..end),
350 sst.meta.block_metas[idx].uncompressed_size as usize,
351 true,
352 )?;
353 let holder = if let CachePolicy::Fill(hint) = policy {
354 let hint = if idx == block_index { hint } else { Hint::Low };
355 let entry = self.block_cache.insert_with_properties(
356 SstableBlockIndex {
357 sst_id: object_id,
358 block_idx: idx as _,
359 },
360 Box::new(block),
361 HybridCacheProperties::default().with_hint(hint),
362 );
363 BlockHolder::from_hybrid_cache_entry(entry)
364 } else {
365 BlockHolder::from_owned_block(Box::new(block))
366 };
367
368 blocks.push_back(holder);
369 offset = end;
370 }
371 Ok(Box::new(PrefetchBlockStream::new(
372 blocks,
373 block_index,
374 Some(tracker),
375 )))
376 }
377
378 pub async fn get_block_response(
379 &self,
380 sst: &Sstable,
381 block_index: usize,
382 policy: CachePolicy,
383 stats: &mut StoreLocalStatistic,
384 ) -> HummockResult<BlockResponse> {
385 let object_id = sst.id;
386 let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
387 let store = self.store.clone();
388
389 stats.cache_data_block_total += 1;
390 let file_size = sst.meta.estimated_size;
391 let data_path = self.get_sst_data_path(object_id);
392
393 let disable_cache: fn() -> bool = || {
394 fail_point!("disable_block_cache", |_| true);
395 false
396 };
397
398 let policy = if disable_cache() {
399 CachePolicy::Disable
400 } else {
401 policy
402 };
403
404 let fetch_block = move || {
406 let range = range.clone();
407
408 async move {
409 let block_data = match store
410 .read(&data_path, range.clone())
411 .instrument_await("get_block_response".verbose())
412 .await
413 {
414 Ok(data) => data,
415 Err(e) => {
416 tracing::error!(
417 "get_block_response meet error when read {:?} from sst-{}, total length: {}",
418 range,
419 object_id,
420 file_size
421 );
422 return Err(anyhow::Error::from(HummockError::from(e)));
423 }
424 };
425 let block = Box::new(
426 Block::decode(block_data, uncompressed_capacity)
427 .map_err(anyhow::Error::from)?,
428 );
429 Ok(block)
430 }
431 };
432
433 if let Some(filter) = self.recent_filter.as_ref() {
434 filter.extend([(object_id, usize::MAX), (object_id, block_index)]);
435 }
436
437 match policy {
438 CachePolicy::Fill(hint) => {
439 let entry = self.block_cache.fetch_with_properties(
440 SstableBlockIndex {
441 sst_id: object_id,
442 block_idx: block_index as _,
443 },
444 HybridCacheProperties::default().with_hint(hint),
445 fetch_block,
446 );
447 if matches!(entry.state(), FetchState::Miss) {
448 stats.cache_data_block_miss += 1;
449 }
450 Ok(BlockResponse::Entry(entry))
451 }
452 CachePolicy::NotFill => {
453 match self
454 .block_cache
455 .get(&SstableBlockIndex {
456 sst_id: object_id,
457 block_idx: block_index as _,
458 })
459 .await
460 .map_err(HummockError::foyer_error)?
461 {
462 Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
463 entry,
464 ))),
465 _ => {
466 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
467 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
468 }
469 }
470 }
471 CachePolicy::Disable => {
472 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
473 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
474 }
475 }
476 }
477
478 pub async fn get(
479 &self,
480 sst: &Sstable,
481 block_index: usize,
482 policy: CachePolicy,
483 stats: &mut StoreLocalStatistic,
484 ) -> HummockResult<BlockHolder> {
485 match self
486 .get_block_response(sst, block_index, policy, stats)
487 .await
488 {
489 Ok(block_response) => block_response.wait().await,
490 Err(err) => Err(err),
491 }
492 }
493
494 pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
495 let object_id = object_id.into();
496 let obj_prefix = self
497 .store
498 .get_object_prefix(object_id.inner(), self.use_new_object_prefix_strategy);
499 risingwave_hummock_sdk::get_object_data_path(
500 &obj_prefix,
501 &self.path,
502 HummockObjectId::Sstable(object_id),
503 )
504 }
505
506 pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
507 risingwave_hummock_sdk::get_object_id_from_path(path)
508 }
509
510 pub fn store(&self) -> ObjectStoreRef {
511 self.store.clone()
512 }
513
514 #[cfg(any(test, feature = "test"))]
515 pub async fn clear_block_cache(&self) -> HummockResult<()> {
516 self.block_cache
517 .clear()
518 .await
519 .map_err(HummockError::foyer_error)
520 }
521
522 #[cfg(any(test, feature = "test"))]
523 pub async fn clear_meta_cache(&self) -> HummockResult<()> {
524 self.meta_cache
525 .clear()
526 .await
527 .map_err(HummockError::foyer_error)
528 }
529
530 pub async fn sstable_cached(
531 &self,
532 sst_obj_id: HummockSstableObjectId,
533 ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
534 self.meta_cache
535 .get(&sst_obj_id)
536 .await
537 .map_err(HummockError::foyer_error)
538 }
539
540 pub fn sstable(
542 &self,
543 sstable_info_ref: &SstableInfo,
544 stats: &mut StoreLocalStatistic,
545 ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
546 let object_id = sstable_info_ref.object_id;
547
548 let entry = self.meta_cache.fetch(object_id, || {
549 let store = self.store.clone();
550 let meta_path = self.get_sst_data_path(object_id);
551 let stats_ptr = stats.remote_io_time.clone();
552 let range = sstable_info_ref.meta_offset as usize..;
553 async move {
554 let now = Instant::now();
555 let buf = store.read(&meta_path, range).await?;
556 let meta = SstableMeta::decode(&buf[..])?;
557
558 let sst = Sstable::new(object_id, meta);
559 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
560 stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
561 Ok(Box::new(sst))
562 }
563 });
564
565 if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
566 stats.cache_meta_block_miss += 1;
567 }
568
569 stats.cache_meta_block_total += 1;
570
571 async move { entry.await.map_err(HummockError::foyer_error) }
572 }
573
574 pub async fn list_sst_object_metadata_from_object_store(
575 &self,
576 prefix: Option<String>,
577 start_after: Option<String>,
578 limit: Option<usize>,
579 ) -> HummockResult<ObjectMetadataIter> {
580 let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
581 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
582 let iter = raw_iter.filter(|r| match r {
583 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
584 Err(_) => future::ready(true),
585 });
586 Ok(Box::pin(iter))
587 }
588
589 pub fn create_sst_writer(
590 self: Arc<Self>,
591 object_id: impl Into<HummockSstableObjectId>,
592 options: SstableWriterOptions,
593 ) -> BatchUploadWriter {
594 BatchUploadWriter::new(object_id, self, options)
595 }
596
597 pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
598 let sst = Sstable::new(object_id, meta);
599 self.meta_cache.insert(object_id, Box::new(sst));
600 }
601
602 pub fn insert_block_cache(
603 &self,
604 object_id: HummockSstableObjectId,
605 block_index: u64,
606 block: Box<Block>,
607 ) {
608 self.block_cache.insert(
609 SstableBlockIndex {
610 sst_id: object_id,
611 block_idx: block_index,
612 },
613 block,
614 );
615 }
616
617 pub fn get_prefetch_memory_usage(&self) -> usize {
618 self.prefetch_buffer_usage.load(Ordering::Acquire)
619 }
620
621 pub async fn get_stream_for_blocks(
622 &self,
623 object_id: HummockSstableObjectId,
624 metas: &[BlockMeta],
625 ) -> HummockResult<BlockDataStream> {
626 fail_point!("get_stream_err");
627 let data_path = self.get_sst_data_path(object_id);
628 let store = self.store().clone();
629 let block_meta = &metas[0];
630 let start_pos = block_meta.offset as usize;
631 let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
632 let range = start_pos..end_pos;
633 let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
635
636 let reader = match ret {
637 Ok(Ok(reader)) => reader,
638 Ok(Err(e)) => return Err(HummockError::from(e)),
639 Err(e) => {
640 return Err(HummockError::other(format!(
641 "failed to get result, this read request may be canceled: {}",
642 e.as_report()
643 )));
644 }
645 };
646 Ok(BlockDataStream::new(reader, metas.to_vec()))
647 }
648
649 pub fn data_recent_filter(
650 &self,
651 ) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
652 self.recent_filter.as_ref()
653 }
654
655 pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
656 &self.meta_cache
657 }
658
659 pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
660 &self.block_cache
661 }
662
663 pub fn recent_filter(&self) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
664 self.recent_filter.as_ref()
665 }
666
667 pub async fn create_streaming_uploader(
668 &self,
669 path: &str,
670 ) -> ObjectResult<ObjectStreamingUploader> {
671 self.store.streaming_upload(path).await
672 }
673}
674
675pub type SstableStoreRef = Arc<SstableStore>;
676#[cfg(test)]
677mod tests {
678 use std::ops::Range;
679 use std::sync::Arc;
680
681 use risingwave_hummock_sdk::HummockObjectId;
682 use risingwave_hummock_sdk::sstable_info::SstableInfo;
683
684 use super::{SstableStoreRef, SstableWriterOptions};
685 use crate::hummock::iterator::HummockIterator;
686 use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
687 use crate::hummock::sstable::SstableIteratorReadOptions;
688 use crate::hummock::test_utils::{
689 default_builder_opt_for_test, gen_test_sstable_data, put_sst,
690 };
691 use crate::hummock::value::HummockValue;
692 use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
693 use crate::monitor::StoreLocalStatistic;
694
695 const SST_ID: u64 = 1;
696
697 fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
698 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
699 }
700
701 async fn validate_sst(
702 sstable_store: SstableStoreRef,
703 info: &SstableInfo,
704 mut meta: SstableMeta,
705 x_range: Range<usize>,
706 ) {
707 let mut stats = StoreLocalStatistic::default();
708 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
709 std::mem::take(&mut meta.bloom_filter);
710 assert_eq!(holder.meta, meta);
711 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
712 assert_eq!(holder.meta, meta);
713 let mut iter = SstableIterator::new(
714 holder,
715 sstable_store,
716 Arc::new(SstableIteratorReadOptions::default()),
717 info,
718 );
719 iter.rewind().await.unwrap();
720 for i in x_range {
721 let key = iter.key();
722 let value = iter.value();
723 assert_eq!(key, iterator_test_key_of(i).to_ref());
724 assert_eq!(value, get_hummock_value(i).as_slice());
725 iter.next().await.unwrap();
726 }
727 }
728
729 #[tokio::test]
730 async fn test_batch_upload() {
731 let sstable_store = mock_sstable_store().await;
732 let x_range = 0..100;
733 let (data, meta) = gen_test_sstable_data(
734 default_builder_opt_for_test(),
735 x_range
736 .clone()
737 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
738 )
739 .await;
740 let writer_opts = SstableWriterOptions {
741 capacity_hint: None,
742 tracker: None,
743 policy: CachePolicy::Disable,
744 };
745 let info = put_sst(
746 SST_ID,
747 data.clone(),
748 meta.clone(),
749 sstable_store.clone(),
750 writer_opts,
751 vec![0],
752 )
753 .await
754 .unwrap();
755
756 validate_sst(sstable_store, &info, meta, x_range).await;
757 }
758
759 #[tokio::test]
760 async fn test_streaming_upload() {
761 let sstable_store = mock_sstable_store().await;
763 let x_range = 0..100;
764 let (data, meta) = gen_test_sstable_data(
765 default_builder_opt_for_test(),
766 x_range
767 .clone()
768 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
769 )
770 .await;
771 let writer_opts = SstableWriterOptions {
772 capacity_hint: None,
773 tracker: None,
774 policy: CachePolicy::Disable,
775 };
776 let info = put_sst(
777 SST_ID,
778 data.clone(),
779 meta.clone(),
780 sstable_store.clone(),
781 writer_opts,
782 vec![0],
783 )
784 .await
785 .unwrap();
786
787 validate_sst(sstable_store, &info, meta, x_range).await;
788 }
789
790 #[tokio::test]
791 async fn test_basic() {
792 let sstable_store = mock_sstable_store().await;
793 let object_id = 123;
794 let data_path = sstable_store.get_sst_data_path(object_id);
795 assert_eq!(data_path, "test/123.data");
796 assert_eq!(
797 SstableStore::get_object_id_from_path(&data_path),
798 HummockObjectId::Sstable(object_id.into())
799 );
800 }
801}