1use std::clone::Clone;
15use std::collections::VecDeque;
16use std::future::Future;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use fail::fail_point;
23use foyer::{
24 CacheHint, Engine, EventListener, FetchState, HybridCache, HybridCacheBuilder, HybridCacheEntry,
25};
26use futures::{StreamExt, future};
27use risingwave_hummock_sdk::sstable_info::SstableInfo;
28use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX};
29use risingwave_hummock_trace::TracedCachePolicy;
30use risingwave_object_store::object::{
31 ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
32};
33use serde::{Deserialize, Serialize};
34use thiserror_ext::AsReport;
35use tokio::time::Instant;
36
37use super::{
38 BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
39 SstableWriterOptions,
40};
41use crate::hummock::block_stream::{
42 BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
43};
44use crate::hummock::{BlockHolder, HummockError, HummockResult};
45use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
46
47pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
48
49#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
50pub struct SstableBlockIndex {
51 pub sst_id: HummockSstableObjectId,
52 pub block_idx: u64,
53}
54
55pub struct BlockCacheEventListener {
56 metrics: Arc<HummockStateStoreMetrics>,
57}
58
59impl BlockCacheEventListener {
60 pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
61 Self { metrics }
62 }
63}
64
65impl EventListener for BlockCacheEventListener {
66 type Key = SstableBlockIndex;
67 type Value = Box<Block>;
68
69 fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
70 where
71 Self::Key: foyer::Key,
72 Self::Value: foyer::Value,
73 {
74 self.metrics
75 .block_efficiency_histogram
76 .observe(value.efficiency());
77 }
78}
79
80#[derive(Clone, Copy, Eq, PartialEq)]
82pub enum CachePolicy {
83 Disable,
85 Fill(CacheHint),
87 NotFill,
89}
90
91impl Default for CachePolicy {
92 fn default() -> Self {
93 CachePolicy::Fill(CacheHint::Normal)
94 }
95}
96
97impl From<TracedCachePolicy> for CachePolicy {
98 fn from(policy: TracedCachePolicy) -> Self {
99 match policy {
100 TracedCachePolicy::Disable => Self::Disable,
101 TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
102 TracedCachePolicy::NotFill => Self::NotFill,
103 }
104 }
105}
106
107impl From<CachePolicy> for TracedCachePolicy {
108 fn from(policy: CachePolicy) -> Self {
109 match policy {
110 CachePolicy::Disable => Self::Disable,
111 CachePolicy::Fill(priority) => Self::Fill(priority.into()),
112 CachePolicy::NotFill => Self::NotFill,
113 }
114 }
115}
116
117pub struct SstableStoreConfig {
118 pub store: ObjectStoreRef,
119 pub path: String,
120
121 pub prefetch_buffer_capacity: usize,
122 pub max_prefetch_block_number: usize,
123 pub recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
124 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
125 pub use_new_object_prefix_strategy: bool,
126
127 pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
128 pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
129}
130
131pub struct SstableStore {
132 path: String,
133 store: ObjectStoreRef,
134
135 meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
136 block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
137
138 recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
142 prefetch_buffer_usage: Arc<AtomicUsize>,
143 prefetch_buffer_capacity: usize,
144 max_prefetch_block_number: usize,
145 use_new_object_prefix_strategy: bool,
154}
155
156impl SstableStore {
157 pub fn new(config: SstableStoreConfig) -> Self {
158 Self {
162 path: config.path,
163 store: config.store,
164
165 meta_cache: config.meta_cache,
166 block_cache: config.block_cache,
167
168 recent_filter: config.recent_filter,
169 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
170 prefetch_buffer_capacity: config.prefetch_buffer_capacity,
171 max_prefetch_block_number: config.max_prefetch_block_number,
172 use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
173 }
174 }
175
176 #[expect(clippy::borrowed_box)]
179 pub async fn for_compactor(
180 store: ObjectStoreRef,
181 path: String,
182 block_cache_capacity: usize,
183 meta_cache_capacity: usize,
184 use_new_object_prefix_strategy: bool,
185 ) -> HummockResult<Self> {
186 let meta_cache = HybridCacheBuilder::new()
187 .memory(meta_cache_capacity)
188 .with_shards(1)
189 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
190 u64::BITS as usize / 8 + value.estimate_size()
191 })
192 .storage(Engine::Large)
193 .build()
194 .await
195 .map_err(HummockError::foyer_error)?;
196
197 let block_cache = HybridCacheBuilder::new()
198 .memory(block_cache_capacity)
199 .with_shards(1)
200 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
201 u64::BITS as usize * 2 / 8 + value.raw().len()
203 })
204 .storage(Engine::Large)
205 .build()
206 .await
207 .map_err(HummockError::foyer_error)?;
208
209 Ok(Self {
210 path,
211 store,
212
213 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
214 prefetch_buffer_capacity: block_cache_capacity,
215 max_prefetch_block_number: 16, recent_filter: None,
217 use_new_object_prefix_strategy,
218
219 meta_cache,
220 block_cache,
221 })
222 }
223
224 pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
225 self.store
226 .delete(self.get_sst_data_path(object_id).as_str())
227 .await?;
228 self.meta_cache.remove(&object_id);
229 Ok(())
231 }
232
233 pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
234 self.meta_cache.remove(&object_id);
235 Ok(())
236 }
237
238 pub(crate) async fn put_sst_data(
239 &self,
240 object_id: HummockSstableObjectId,
241 data: Bytes,
242 ) -> HummockResult<()> {
243 let data_path = self.get_sst_data_path(object_id);
244 self.store
245 .upload(&data_path, data)
246 .await
247 .map_err(Into::into)
248 }
249
250 pub async fn prefetch_blocks(
251 &self,
252 sst: &Sstable,
253 block_index: usize,
254 end_index: usize,
255 policy: CachePolicy,
256 stats: &mut StoreLocalStatistic,
257 ) -> HummockResult<Box<dyn BlockStream>> {
258 let object_id = sst.id;
259 if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
260 let block = self.get(sst, block_index, policy, stats).await?;
261 return Ok(Box::new(PrefetchBlockStream::new(
262 VecDeque::from([block]),
263 block_index,
264 None,
265 )));
266 }
267 stats.cache_data_block_total += 1;
268 if let Some(entry) = self
269 .block_cache
270 .get(&SstableBlockIndex {
271 sst_id: object_id,
272 block_idx: block_index as _,
273 })
274 .await
275 .map_err(HummockError::foyer_error)?
276 {
277 let block = BlockHolder::from_hybrid_cache_entry(entry);
278 return Ok(Box::new(PrefetchBlockStream::new(
279 VecDeque::from([block]),
280 block_index,
281 None,
282 )));
283 }
284 let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
285 let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
286 let start_offset = sst.meta.block_metas[block_index].offset as usize;
287 let mut min_hit_index = end_index;
288 let mut hit_count = 0;
289 for idx in block_index..end_index {
290 if self.block_cache.contains(&SstableBlockIndex {
291 sst_id: object_id,
292 block_idx: idx as _,
293 }) {
294 if min_hit_index > idx && idx > block_index {
295 min_hit_index = idx;
296 }
297 hit_count += 1;
298 }
299 }
300
301 if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
302 {
303 end_index = min_hit_index;
304 }
305 stats.cache_data_prefetch_count += 1;
306 stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
307 let end_offset = start_offset
308 + sst.meta.block_metas[block_index..end_index]
309 .iter()
310 .map(|meta| meta.len as usize)
311 .sum::<usize>();
312 let data_path = self.get_sst_data_path(object_id);
313 let memory_usage = end_offset - start_offset;
314 let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
315 let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
316 let store = self.store.clone();
317 let join_handle = tokio::spawn(async move {
318 store
319 .read(&data_path, start_offset..end_offset)
320 .instrument_await(span)
321 .await
322 });
323 let buf = match join_handle.await {
324 Ok(Ok(data)) => data,
325 Ok(Err(e)) => {
326 tracing::error!(
327 "prefetch meet error when read {}..{} from sst-{} ({})",
328 start_offset,
329 end_offset,
330 object_id,
331 sst.meta.estimated_size,
332 );
333 return Err(e.into());
334 }
335 Err(_) => {
336 return Err(HummockError::other("cancel by other thread"));
337 }
338 };
339 let mut offset = 0;
340 let mut blocks = VecDeque::default();
341 for idx in block_index..end_index {
342 let end = offset + sst.meta.block_metas[idx].len as usize;
343 if end > buf.len() {
344 return Err(ObjectError::internal("read unexpected EOF").into());
345 }
346 let block = Block::decode_with_copy(
348 buf.slice(offset..end),
349 sst.meta.block_metas[idx].uncompressed_size as usize,
350 true,
351 )?;
352 let holder = if let CachePolicy::Fill(priority) = policy {
353 let cache_priority = if idx == block_index {
354 priority
355 } else {
356 CacheHint::Low
357 };
358 let entry = self.block_cache.insert_with_hint(
359 SstableBlockIndex {
360 sst_id: object_id,
361 block_idx: idx as _,
362 },
363 Box::new(block),
364 cache_priority,
365 );
366 BlockHolder::from_hybrid_cache_entry(entry)
367 } else {
368 BlockHolder::from_owned_block(Box::new(block))
369 };
370
371 blocks.push_back(holder);
372 offset = end;
373 }
374 Ok(Box::new(PrefetchBlockStream::new(
375 blocks,
376 block_index,
377 Some(tracker),
378 )))
379 }
380
381 pub async fn get_block_response(
382 &self,
383 sst: &Sstable,
384 block_index: usize,
385 policy: CachePolicy,
386 stats: &mut StoreLocalStatistic,
387 ) -> HummockResult<BlockResponse> {
388 let object_id = sst.id;
389 let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
390 let store = self.store.clone();
391
392 stats.cache_data_block_total += 1;
393 let file_size = sst.meta.estimated_size;
394 let data_path = self.get_sst_data_path(object_id);
395
396 let disable_cache: fn() -> bool = || {
397 fail_point!("disable_block_cache", |_| true);
398 false
399 };
400
401 let policy = if disable_cache() {
402 CachePolicy::Disable
403 } else {
404 policy
405 };
406
407 let fetch_block = move || {
409 let range = range.clone();
410
411 async move {
412 let block_data = match store
413 .read(&data_path, range.clone())
414 .instrument_await("get_block_response".verbose())
415 .await
416 {
417 Ok(data) => data,
418 Err(e) => {
419 tracing::error!(
420 "get_block_response meet error when read {:?} from sst-{}, total length: {}",
421 range,
422 object_id,
423 file_size
424 );
425 return Err(anyhow::Error::from(HummockError::from(e)));
426 }
427 };
428 let block = Box::new(
429 Block::decode(block_data, uncompressed_capacity)
430 .map_err(anyhow::Error::from)?,
431 );
432 Ok(block)
433 }
434 };
435
436 if let Some(filter) = self.recent_filter.as_ref() {
437 filter.extend([(object_id, usize::MAX), (object_id, block_index)]);
438 }
439
440 match policy {
441 CachePolicy::Fill(context) => {
442 let entry = self.block_cache.fetch_with_hint(
443 SstableBlockIndex {
444 sst_id: object_id,
445 block_idx: block_index as _,
446 },
447 context,
448 fetch_block,
449 );
450 if matches!(entry.state(), FetchState::Miss) {
451 stats.cache_data_block_miss += 1;
452 }
453 Ok(BlockResponse::Entry(entry))
454 }
455 CachePolicy::NotFill => {
456 match self
457 .block_cache
458 .get(&SstableBlockIndex {
459 sst_id: object_id,
460 block_idx: block_index as _,
461 })
462 .await
463 .map_err(HummockError::foyer_error)?
464 {
465 Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
466 entry,
467 ))),
468 _ => {
469 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
470 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
471 }
472 }
473 }
474 CachePolicy::Disable => {
475 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
476 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
477 }
478 }
479 }
480
481 pub async fn get(
482 &self,
483 sst: &Sstable,
484 block_index: usize,
485 policy: CachePolicy,
486 stats: &mut StoreLocalStatistic,
487 ) -> HummockResult<BlockHolder> {
488 match self
489 .get_block_response(sst, block_index, policy, stats)
490 .await
491 {
492 Ok(block_response) => block_response.wait().await,
493 Err(err) => Err(err),
494 }
495 }
496
497 pub fn get_sst_data_path(&self, object_id: HummockSstableObjectId) -> String {
498 let obj_prefix = self
499 .store
500 .get_object_prefix(object_id, self.use_new_object_prefix_strategy);
501 risingwave_hummock_sdk::get_sst_data_path(&obj_prefix, &self.path, object_id)
502 }
503
504 pub fn get_object_id_from_path(path: &str) -> HummockSstableObjectId {
505 risingwave_hummock_sdk::get_object_id_from_path(path)
506 }
507
508 pub fn store(&self) -> ObjectStoreRef {
509 self.store.clone()
510 }
511
512 #[cfg(any(test, feature = "test"))]
513 pub async fn clear_block_cache(&self) -> HummockResult<()> {
514 self.block_cache
515 .clear()
516 .await
517 .map_err(HummockError::foyer_error)
518 }
519
520 #[cfg(any(test, feature = "test"))]
521 pub async fn clear_meta_cache(&self) -> HummockResult<()> {
522 self.meta_cache
523 .clear()
524 .await
525 .map_err(HummockError::foyer_error)
526 }
527
528 pub async fn sstable_cached(
529 &self,
530 sst_obj_id: HummockSstableObjectId,
531 ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
532 self.meta_cache
533 .get(&sst_obj_id)
534 .await
535 .map_err(HummockError::foyer_error)
536 }
537
538 pub fn sstable(
540 &self,
541 sstable_info_ref: &SstableInfo,
542 stats: &mut StoreLocalStatistic,
543 ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
544 let object_id = sstable_info_ref.object_id;
545
546 let entry = self.meta_cache.fetch(object_id, || {
547 let store = self.store.clone();
548 let meta_path = self.get_sst_data_path(object_id);
549 let stats_ptr = stats.remote_io_time.clone();
550 let range = sstable_info_ref.meta_offset as usize..;
551 async move {
552 let now = Instant::now();
553 let buf = store.read(&meta_path, range).await?;
554 let meta = SstableMeta::decode(&buf[..])?;
555
556 let sst = Sstable::new(object_id, meta);
557 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
558 stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
559 Ok(Box::new(sst))
560 }
561 });
562
563 if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
564 stats.cache_meta_block_miss += 1;
565 }
566
567 stats.cache_meta_block_total += 1;
568
569 async move { entry.await.map_err(HummockError::foyer_error) }
570 }
571
572 pub async fn list_object_metadata_from_object_store(
573 &self,
574 prefix: Option<String>,
575 start_after: Option<String>,
576 limit: Option<usize>,
577 ) -> HummockResult<ObjectMetadataIter> {
578 let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
579 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
580 let iter = raw_iter.filter(|r| match r {
581 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))),
582 Err(_) => future::ready(true),
583 });
584 Ok(Box::pin(iter))
585 }
586
587 pub fn create_sst_writer(
588 self: Arc<Self>,
589 object_id: HummockSstableObjectId,
590 options: SstableWriterOptions,
591 ) -> BatchUploadWriter {
592 BatchUploadWriter::new(object_id, self, options)
593 }
594
595 pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
596 let sst = Sstable::new(object_id, meta);
597 self.meta_cache.insert(object_id, Box::new(sst));
598 }
599
600 pub fn insert_block_cache(
601 &self,
602 object_id: HummockSstableObjectId,
603 block_index: u64,
604 block: Box<Block>,
605 ) {
606 self.block_cache.insert(
607 SstableBlockIndex {
608 sst_id: object_id,
609 block_idx: block_index,
610 },
611 block,
612 );
613 }
614
615 pub fn get_prefetch_memory_usage(&self) -> usize {
616 self.prefetch_buffer_usage.load(Ordering::Acquire)
617 }
618
619 pub async fn get_stream_for_blocks(
620 &self,
621 object_id: HummockSstableObjectId,
622 metas: &[BlockMeta],
623 ) -> HummockResult<BlockDataStream> {
624 fail_point!("get_stream_err");
625 let data_path = self.get_sst_data_path(object_id);
626 let store = self.store().clone();
627 let block_meta = &metas[0];
628 let start_pos = block_meta.offset as usize;
629 let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
630 let range = start_pos..end_pos;
631 let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
633
634 let reader = match ret {
635 Ok(Ok(reader)) => reader,
636 Ok(Err(e)) => return Err(HummockError::from(e)),
637 Err(e) => {
638 return Err(HummockError::other(format!(
639 "failed to get result, this read request may be canceled: {}",
640 e.as_report()
641 )));
642 }
643 };
644 Ok(BlockDataStream::new(reader, metas.to_vec()))
645 }
646
647 pub fn data_recent_filter(
648 &self,
649 ) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
650 self.recent_filter.as_ref()
651 }
652
653 pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
654 &self.meta_cache
655 }
656
657 pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
658 &self.block_cache
659 }
660
661 pub fn recent_filter(&self) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
662 self.recent_filter.as_ref()
663 }
664
665 pub async fn create_streaming_uploader(
666 &self,
667 path: &str,
668 ) -> ObjectResult<ObjectStreamingUploader> {
669 self.store.streaming_upload(path).await
670 }
671}
672
673pub type SstableStoreRef = Arc<SstableStore>;
674#[cfg(test)]
675mod tests {
676 use std::ops::Range;
677 use std::sync::Arc;
678
679 use risingwave_hummock_sdk::HummockSstableObjectId;
680 use risingwave_hummock_sdk::sstable_info::SstableInfo;
681
682 use super::{SstableStoreRef, SstableWriterOptions};
683 use crate::hummock::iterator::HummockIterator;
684 use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
685 use crate::hummock::sstable::SstableIteratorReadOptions;
686 use crate::hummock::test_utils::{
687 default_builder_opt_for_test, gen_test_sstable_data, put_sst,
688 };
689 use crate::hummock::value::HummockValue;
690 use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
691 use crate::monitor::StoreLocalStatistic;
692
693 const SST_ID: HummockSstableObjectId = 1;
694
695 fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
696 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
697 }
698
699 async fn validate_sst(
700 sstable_store: SstableStoreRef,
701 info: &SstableInfo,
702 mut meta: SstableMeta,
703 x_range: Range<usize>,
704 ) {
705 let mut stats = StoreLocalStatistic::default();
706 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
707 std::mem::take(&mut meta.bloom_filter);
708 assert_eq!(holder.meta, meta);
709 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
710 assert_eq!(holder.meta, meta);
711 let mut iter = SstableIterator::new(
712 holder,
713 sstable_store,
714 Arc::new(SstableIteratorReadOptions::default()),
715 info,
716 );
717 iter.rewind().await.unwrap();
718 for i in x_range {
719 let key = iter.key();
720 let value = iter.value();
721 assert_eq!(key, iterator_test_key_of(i).to_ref());
722 assert_eq!(value, get_hummock_value(i).as_slice());
723 iter.next().await.unwrap();
724 }
725 }
726
727 #[tokio::test]
728 async fn test_batch_upload() {
729 let sstable_store = mock_sstable_store().await;
730 let x_range = 0..100;
731 let (data, meta) = gen_test_sstable_data(
732 default_builder_opt_for_test(),
733 x_range
734 .clone()
735 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
736 )
737 .await;
738 let writer_opts = SstableWriterOptions {
739 capacity_hint: None,
740 tracker: None,
741 policy: CachePolicy::Disable,
742 };
743 let info = put_sst(
744 SST_ID,
745 data.clone(),
746 meta.clone(),
747 sstable_store.clone(),
748 writer_opts,
749 vec![0],
750 )
751 .await
752 .unwrap();
753
754 validate_sst(sstable_store, &info, meta, x_range).await;
755 }
756
757 #[tokio::test]
758 async fn test_streaming_upload() {
759 let sstable_store = mock_sstable_store().await;
761 let x_range = 0..100;
762 let (data, meta) = gen_test_sstable_data(
763 default_builder_opt_for_test(),
764 x_range
765 .clone()
766 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
767 )
768 .await;
769 let writer_opts = SstableWriterOptions {
770 capacity_hint: None,
771 tracker: None,
772 policy: CachePolicy::Disable,
773 };
774 let info = put_sst(
775 SST_ID,
776 data.clone(),
777 meta.clone(),
778 sstable_store.clone(),
779 writer_opts,
780 vec![0],
781 )
782 .await
783 .unwrap();
784
785 validate_sst(sstable_store, &info, meta, x_range).await;
786 }
787
788 #[tokio::test]
789 async fn test_basic() {
790 let sstable_store = mock_sstable_store().await;
791 let object_id = 123;
792 let data_path = sstable_store.get_sst_data_path(object_id);
793 assert_eq!(data_path, "test/123.data");
794 assert_eq!(SstableStore::get_object_id_from_path(&data_path), object_id);
795 }
796}