1use std::clone::Clone;
15use std::collections::VecDeque;
16use std::future::Future;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::Bytes;
22use fail::fail_point;
23use foyer::{
24 Cache, CacheBuilder, CacheEntry, Engine, EventListener, FetchState, Hint, HybridCache,
25 HybridCacheBuilder, HybridCacheEntry, HybridCacheProperties, LargeEngineOptions,
26};
27use futures::{StreamExt, future};
28use risingwave_hummock_sdk::sstable_info::SstableInfo;
29use risingwave_hummock_sdk::vector_index::VectorFileInfo;
30use risingwave_hummock_sdk::{
31 HummockObjectId, HummockSstableObjectId, HummockVectorFileId, SST_OBJECT_SUFFIX,
32};
33use risingwave_hummock_trace::TracedCachePolicy;
34use risingwave_object_store::object::{
35 ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader,
36};
37use serde::{Deserialize, Serialize};
38use thiserror_ext::AsReport;
39use tokio::time::Instant;
40
41use super::{
42 BatchUploadWriter, Block, BlockMeta, BlockResponse, RecentFilter, Sstable, SstableMeta,
43 SstableWriterOptions,
44};
45use crate::hummock::block_stream::{
46 BlockDataStream, BlockStream, MemoryUsageTracker, PrefetchBlockStream,
47};
48use crate::hummock::vector::file::{VectorBlock, VectorBlockMeta, VectorFileMeta};
49use crate::hummock::{BlockHolder, HummockError, HummockResult};
50use crate::monitor::{HummockStateStoreMetrics, StoreLocalStatistic};
51
52pub type TableHolder = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>;
53pub type VectorFileHolder = CacheEntry<HummockVectorFileId, Box<VectorFileMeta>>;
54pub type VectorBlockHolder = CacheEntry<(HummockVectorFileId, usize), Box<VectorBlock>>;
55
56#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
57pub struct SstableBlockIndex {
58 pub sst_id: HummockSstableObjectId,
59 pub block_idx: u64,
60}
61
62pub struct BlockCacheEventListener {
63 metrics: Arc<HummockStateStoreMetrics>,
64}
65
66impl BlockCacheEventListener {
67 pub fn new(metrics: Arc<HummockStateStoreMetrics>) -> Self {
68 Self { metrics }
69 }
70}
71
72impl EventListener for BlockCacheEventListener {
73 type Key = SstableBlockIndex;
74 type Value = Box<Block>;
75
76 fn on_leave(&self, _reason: foyer::Event, _key: &Self::Key, value: &Self::Value)
77 where
78 Self::Key: foyer::Key,
79 Self::Value: foyer::Value,
80 {
81 self.metrics
82 .block_efficiency_histogram
83 .observe(value.efficiency());
84 }
85}
86
87#[derive(Clone, Copy, Eq, PartialEq)]
89pub enum CachePolicy {
90 Disable,
92 Fill(Hint),
94 NotFill,
96}
97
98impl Default for CachePolicy {
99 fn default() -> Self {
100 CachePolicy::Fill(Hint::Normal)
101 }
102}
103
104impl From<TracedCachePolicy> for CachePolicy {
105 fn from(policy: TracedCachePolicy) -> Self {
106 match policy {
107 TracedCachePolicy::Disable => Self::Disable,
108 TracedCachePolicy::Fill(priority) => Self::Fill(priority.into()),
109 TracedCachePolicy::NotFill => Self::NotFill,
110 }
111 }
112}
113
114impl From<CachePolicy> for TracedCachePolicy {
115 fn from(policy: CachePolicy) -> Self {
116 match policy {
117 CachePolicy::Disable => Self::Disable,
118 CachePolicy::Fill(priority) => Self::Fill(priority.into()),
119 CachePolicy::NotFill => Self::NotFill,
120 }
121 }
122}
123
124pub struct SstableStoreConfig {
125 pub store: ObjectStoreRef,
126 pub path: String,
127
128 pub prefetch_buffer_capacity: usize,
129 pub max_prefetch_block_number: usize,
130 pub recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
131 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
132 pub use_new_object_prefix_strategy: bool,
133
134 pub meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
135 pub block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
136
137 pub vector_meta_cache: Cache<HummockVectorFileId, Box<VectorFileMeta>>,
138 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
139}
140
141pub struct SstableStore {
142 path: String,
143 store: ObjectStoreRef,
144
145 meta_cache: HybridCache<HummockSstableObjectId, Box<Sstable>>,
146 block_cache: HybridCache<SstableBlockIndex, Box<Block>>,
147 pub vector_meta_cache: Cache<HummockVectorFileId, Box<VectorFileMeta>>,
148 pub vector_block_cache: Cache<(HummockVectorFileId, usize), Box<VectorBlock>>,
149
150 recent_filter: Option<Arc<RecentFilter<(HummockSstableObjectId, usize)>>>,
154 prefetch_buffer_usage: Arc<AtomicUsize>,
155 prefetch_buffer_capacity: usize,
156 max_prefetch_block_number: usize,
157 use_new_object_prefix_strategy: bool,
166}
167
168impl SstableStore {
169 pub fn new(config: SstableStoreConfig) -> Self {
170 Self {
174 path: config.path,
175 store: config.store,
176
177 meta_cache: config.meta_cache,
178 block_cache: config.block_cache,
179 vector_meta_cache: config.vector_meta_cache,
180 vector_block_cache: config.vector_block_cache,
181
182 recent_filter: config.recent_filter,
183 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
184 prefetch_buffer_capacity: config.prefetch_buffer_capacity,
185 max_prefetch_block_number: config.max_prefetch_block_number,
186 use_new_object_prefix_strategy: config.use_new_object_prefix_strategy,
187 }
188 }
189
190 #[expect(clippy::borrowed_box)]
193 pub async fn for_compactor(
194 store: ObjectStoreRef,
195 path: String,
196 block_cache_capacity: usize,
197 meta_cache_capacity: usize,
198 use_new_object_prefix_strategy: bool,
199 ) -> HummockResult<Self> {
200 let meta_cache = HybridCacheBuilder::new()
201 .memory(meta_cache_capacity)
202 .with_shards(1)
203 .with_weighter(|_: &HummockSstableObjectId, value: &Box<Sstable>| {
204 u64::BITS as usize / 8 + value.estimate_size()
205 })
206 .storage(Engine::Large(LargeEngineOptions::new()))
207 .build()
208 .await
209 .map_err(HummockError::foyer_error)?;
210
211 let block_cache = HybridCacheBuilder::new()
212 .memory(block_cache_capacity)
213 .with_shards(1)
214 .with_weighter(|_: &SstableBlockIndex, value: &Box<Block>| {
215 u64::BITS as usize * 2 / 8 + value.raw().len()
217 })
218 .storage(Engine::Large(LargeEngineOptions::new()))
219 .build()
220 .await
221 .map_err(HummockError::foyer_error)?;
222
223 Ok(Self {
224 path,
225 store,
226
227 prefetch_buffer_usage: Arc::new(AtomicUsize::new(0)),
228 prefetch_buffer_capacity: block_cache_capacity,
229 max_prefetch_block_number: 16, recent_filter: None,
231 use_new_object_prefix_strategy,
232
233 meta_cache,
234 block_cache,
235 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
236 vector_block_cache: CacheBuilder::new(1 << 10).build(),
237 })
238 }
239
240 pub async fn delete(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
241 self.store
242 .delete(self.get_sst_data_path(object_id).as_str())
243 .await?;
244 self.meta_cache.remove(&object_id);
245 Ok(())
247 }
248
249 pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> {
250 self.meta_cache.remove(&object_id);
251 Ok(())
252 }
253
254 pub(crate) async fn put_sst_data(
255 &self,
256 object_id: HummockSstableObjectId,
257 data: Bytes,
258 ) -> HummockResult<()> {
259 let data_path = self.get_sst_data_path(object_id);
260 self.store
261 .upload(&data_path, data)
262 .await
263 .map_err(Into::into)
264 }
265
266 pub async fn prefetch_blocks(
267 &self,
268 sst: &Sstable,
269 block_index: usize,
270 end_index: usize,
271 policy: CachePolicy,
272 stats: &mut StoreLocalStatistic,
273 ) -> HummockResult<Box<dyn BlockStream>> {
274 let object_id = sst.id;
275 if self.prefetch_buffer_usage.load(Ordering::Acquire) > self.prefetch_buffer_capacity {
276 let block = self.get(sst, block_index, policy, stats).await?;
277 return Ok(Box::new(PrefetchBlockStream::new(
278 VecDeque::from([block]),
279 block_index,
280 None,
281 )));
282 }
283 stats.cache_data_block_total += 1;
284 if let Some(entry) = self
285 .block_cache
286 .get(&SstableBlockIndex {
287 sst_id: object_id,
288 block_idx: block_index as _,
289 })
290 .await
291 .map_err(HummockError::foyer_error)?
292 {
293 let block = BlockHolder::from_hybrid_cache_entry(entry);
294 return Ok(Box::new(PrefetchBlockStream::new(
295 VecDeque::from([block]),
296 block_index,
297 None,
298 )));
299 }
300 let end_index = std::cmp::min(end_index, block_index + self.max_prefetch_block_number);
301 let mut end_index = std::cmp::min(end_index, sst.meta.block_metas.len());
302 let start_offset = sst.meta.block_metas[block_index].offset as usize;
303 let mut min_hit_index = end_index;
304 let mut hit_count = 0;
305 for idx in block_index..end_index {
306 if self.block_cache.contains(&SstableBlockIndex {
307 sst_id: object_id,
308 block_idx: idx as _,
309 }) {
310 if min_hit_index > idx && idx > block_index {
311 min_hit_index = idx;
312 }
313 hit_count += 1;
314 }
315 }
316
317 if hit_count * 3 >= (end_index - block_index) || min_hit_index * 2 > block_index + end_index
318 {
319 end_index = min_hit_index;
320 }
321 stats.cache_data_prefetch_count += 1;
322 stats.cache_data_prefetch_block_count += (end_index - block_index) as u64;
323 let end_offset = start_offset
324 + sst.meta.block_metas[block_index..end_index]
325 .iter()
326 .map(|meta| meta.len as usize)
327 .sum::<usize>();
328 let data_path = self.get_sst_data_path(object_id);
329 let memory_usage = end_offset - start_offset;
330 let tracker = MemoryUsageTracker::new(self.prefetch_buffer_usage.clone(), memory_usage);
331 let span = await_tree::span!("Prefetch SST-{}", object_id).verbose();
332 let store = self.store.clone();
333 let join_handle = tokio::spawn(async move {
334 store
335 .read(&data_path, start_offset..end_offset)
336 .instrument_await(span)
337 .await
338 });
339 let buf = match join_handle.await {
340 Ok(Ok(data)) => data,
341 Ok(Err(e)) => {
342 tracing::error!(
343 "prefetch meet error when read {}..{} from sst-{} ({})",
344 start_offset,
345 end_offset,
346 object_id,
347 sst.meta.estimated_size,
348 );
349 return Err(e.into());
350 }
351 Err(_) => {
352 return Err(HummockError::other("cancel by other thread"));
353 }
354 };
355 let mut offset = 0;
356 let mut blocks = VecDeque::default();
357 for idx in block_index..end_index {
358 let end = offset + sst.meta.block_metas[idx].len as usize;
359 if end > buf.len() {
360 return Err(ObjectError::internal("read unexpected EOF").into());
361 }
362 let block = Block::decode_with_copy(
364 buf.slice(offset..end),
365 sst.meta.block_metas[idx].uncompressed_size as usize,
366 true,
367 )?;
368 let holder = if let CachePolicy::Fill(hint) = policy {
369 let hint = if idx == block_index { hint } else { Hint::Low };
370 let entry = self.block_cache.insert_with_properties(
371 SstableBlockIndex {
372 sst_id: object_id,
373 block_idx: idx as _,
374 },
375 Box::new(block),
376 HybridCacheProperties::default().with_hint(hint),
377 );
378 BlockHolder::from_hybrid_cache_entry(entry)
379 } else {
380 BlockHolder::from_owned_block(Box::new(block))
381 };
382
383 blocks.push_back(holder);
384 offset = end;
385 }
386 Ok(Box::new(PrefetchBlockStream::new(
387 blocks,
388 block_index,
389 Some(tracker),
390 )))
391 }
392
393 pub async fn get_block_response(
394 &self,
395 sst: &Sstable,
396 block_index: usize,
397 policy: CachePolicy,
398 stats: &mut StoreLocalStatistic,
399 ) -> HummockResult<BlockResponse> {
400 let object_id = sst.id;
401 let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
402 let store = self.store.clone();
403
404 stats.cache_data_block_total += 1;
405 let file_size = sst.meta.estimated_size;
406 let data_path = self.get_sst_data_path(object_id);
407
408 let disable_cache: fn() -> bool = || {
409 fail_point!("disable_block_cache", |_| true);
410 false
411 };
412
413 let policy = if disable_cache() {
414 CachePolicy::Disable
415 } else {
416 policy
417 };
418
419 let fetch_block = move || {
421 let range = range.clone();
422
423 async move {
424 let block_data = match store
425 .read(&data_path, range.clone())
426 .instrument_await("get_block_response".verbose())
427 .await
428 {
429 Ok(data) => data,
430 Err(e) => {
431 tracing::error!(
432 "get_block_response meet error when read {:?} from sst-{}, total length: {}",
433 range,
434 object_id,
435 file_size
436 );
437 return Err(anyhow::Error::from(HummockError::from(e)));
438 }
439 };
440 let block = Box::new(
441 Block::decode(block_data, uncompressed_capacity)
442 .map_err(anyhow::Error::from)?,
443 );
444 Ok(block)
445 }
446 };
447
448 if let Some(filter) = self.recent_filter.as_ref() {
449 filter.extend([(object_id, usize::MAX), (object_id, block_index)]);
450 }
451
452 match policy {
453 CachePolicy::Fill(hint) => {
454 let entry = self.block_cache.fetch_with_properties(
455 SstableBlockIndex {
456 sst_id: object_id,
457 block_idx: block_index as _,
458 },
459 HybridCacheProperties::default().with_hint(hint),
460 fetch_block,
461 );
462 if matches!(entry.state(), FetchState::Miss) {
463 stats.cache_data_block_miss += 1;
464 }
465 Ok(BlockResponse::Entry(entry))
466 }
467 CachePolicy::NotFill => {
468 match self
469 .block_cache
470 .get(&SstableBlockIndex {
471 sst_id: object_id,
472 block_idx: block_index as _,
473 })
474 .await
475 .map_err(HummockError::foyer_error)?
476 {
477 Some(entry) => Ok(BlockResponse::Block(BlockHolder::from_hybrid_cache_entry(
478 entry,
479 ))),
480 _ => {
481 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
482 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
483 }
484 }
485 }
486 CachePolicy::Disable => {
487 let block = fetch_block().await.map_err(HummockError::foyer_error)?;
488 Ok(BlockResponse::Block(BlockHolder::from_owned_block(block)))
489 }
490 }
491 }
492
493 pub async fn get(
494 &self,
495 sst: &Sstable,
496 block_index: usize,
497 policy: CachePolicy,
498 stats: &mut StoreLocalStatistic,
499 ) -> HummockResult<BlockHolder> {
500 match self
501 .get_block_response(sst, block_index, policy, stats)
502 .await
503 {
504 Ok(block_response) => block_response.wait().await,
505 Err(err) => Err(err),
506 }
507 }
508
509 pub async fn get_vector_file_meta(
510 &self,
511 vector_file: &VectorFileInfo,
512 ) -> HummockResult<VectorFileHolder> {
513 self.vector_meta_cache
514 .fetch(vector_file.object_id, || {
515 let store = self.store.clone();
516 let path =
517 self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
518 let meta_offset = vector_file.meta_offset;
519 async move {
520 let encoded_footer = store.read(&path, meta_offset..).await?;
521 let meta = VectorFileMeta::decode_footer(&encoded_footer)?;
522 Ok(Box::new(meta))
523 }
524 })
525 .await
526 }
527
528 pub async fn get_vector_block(
529 &self,
530 vector_file: &VectorFileInfo,
531 block_idx: usize,
532 block_meta: &VectorBlockMeta,
533 ) -> HummockResult<VectorBlockHolder> {
534 self.vector_block_cache
535 .fetch((vector_file.object_id, block_idx), || {
536 let store = self.store.clone();
537 let path =
538 self.get_object_data_path(HummockObjectId::VectorFile(vector_file.object_id));
539 let start_offset = block_meta.offset;
540 let end_offset = start_offset + block_meta.block_size;
541 async move {
542 let encoded_block = store.read(&path, start_offset..end_offset).await?;
543 let block = VectorBlock::decode(&encoded_block)?;
544 Ok(Box::new(block))
545 }
546 })
547 .await
548 }
549
550 pub fn insert_vector_cache(
551 &self,
552 object_id: HummockVectorFileId,
553 meta: VectorFileMeta,
554 blocks: Vec<VectorBlock>,
555 ) {
556 self.vector_meta_cache.insert(object_id, Box::new(meta));
557 for (idx, block) in blocks.into_iter().enumerate() {
558 self.vector_block_cache
559 .insert((object_id, idx), Box::new(block));
560 }
561 }
562
563 pub fn get_sst_data_path(&self, object_id: impl Into<HummockSstableObjectId>) -> String {
564 self.get_object_data_path(HummockObjectId::Sstable(object_id.into()))
565 }
566
567 pub fn get_object_data_path(&self, object_id: HummockObjectId) -> String {
568 let obj_prefix = self.store.get_object_prefix(
569 object_id.as_raw().inner(),
570 self.use_new_object_prefix_strategy,
571 );
572 risingwave_hummock_sdk::get_object_data_path(&obj_prefix, &self.path, object_id)
573 }
574
575 pub fn get_object_id_from_path(path: &str) -> HummockObjectId {
576 risingwave_hummock_sdk::get_object_id_from_path(path)
577 }
578
579 pub fn store(&self) -> ObjectStoreRef {
580 self.store.clone()
581 }
582
583 #[cfg(any(test, feature = "test"))]
584 pub async fn clear_block_cache(&self) -> HummockResult<()> {
585 self.block_cache
586 .clear()
587 .await
588 .map_err(HummockError::foyer_error)
589 }
590
591 #[cfg(any(test, feature = "test"))]
592 pub async fn clear_meta_cache(&self) -> HummockResult<()> {
593 self.meta_cache
594 .clear()
595 .await
596 .map_err(HummockError::foyer_error)
597 }
598
599 pub async fn sstable_cached(
600 &self,
601 sst_obj_id: HummockSstableObjectId,
602 ) -> HummockResult<Option<HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>> {
603 self.meta_cache
604 .get(&sst_obj_id)
605 .await
606 .map_err(HummockError::foyer_error)
607 }
608
609 pub fn sstable(
611 &self,
612 sstable_info_ref: &SstableInfo,
613 stats: &mut StoreLocalStatistic,
614 ) -> impl Future<Output = HummockResult<TableHolder>> + Send + 'static + use<> {
615 let object_id = sstable_info_ref.object_id;
616
617 let entry = self.meta_cache.fetch(object_id, || {
618 let store = self.store.clone();
619 let meta_path = self.get_sst_data_path(object_id);
620 let stats_ptr = stats.remote_io_time.clone();
621 let range = sstable_info_ref.meta_offset as usize..;
622 async move {
623 let now = Instant::now();
624 let buf = store.read(&meta_path, range).await?;
625 let meta = SstableMeta::decode(&buf[..])?;
626
627 let sst = Sstable::new(object_id, meta);
628 let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
629 stats_ptr.fetch_add(add as u64, Ordering::Relaxed);
630 Ok(Box::new(sst))
631 }
632 });
633
634 if matches! { entry.state(), FetchState::Wait | FetchState::Miss } {
635 stats.cache_meta_block_miss += 1;
636 }
637
638 stats.cache_meta_block_total += 1;
639
640 async move { entry.await.map_err(HummockError::foyer_error) }
641 }
642
643 pub async fn list_sst_object_metadata_from_object_store(
644 &self,
645 prefix: Option<String>,
646 start_after: Option<String>,
647 limit: Option<usize>,
648 ) -> HummockResult<ObjectMetadataIter> {
649 let list_path = format!("{}/{}", self.path, prefix.unwrap_or("".into()));
650 let raw_iter = self.store.list(&list_path, start_after, limit).await?;
651 let iter = raw_iter.filter(|r| match r {
652 Ok(i) => future::ready(i.key.ends_with(&format!(".{}", SST_OBJECT_SUFFIX))),
653 Err(_) => future::ready(true),
654 });
655 Ok(Box::pin(iter))
656 }
657
658 pub fn create_sst_writer(
659 self: Arc<Self>,
660 object_id: impl Into<HummockSstableObjectId>,
661 options: SstableWriterOptions,
662 ) -> BatchUploadWriter {
663 BatchUploadWriter::new(object_id, self, options)
664 }
665
666 pub fn insert_meta_cache(&self, object_id: HummockSstableObjectId, meta: SstableMeta) {
667 let sst = Sstable::new(object_id, meta);
668 self.meta_cache.insert(object_id, Box::new(sst));
669 }
670
671 pub fn insert_block_cache(
672 &self,
673 object_id: HummockSstableObjectId,
674 block_index: u64,
675 block: Box<Block>,
676 ) {
677 self.block_cache.insert(
678 SstableBlockIndex {
679 sst_id: object_id,
680 block_idx: block_index,
681 },
682 block,
683 );
684 }
685
686 pub fn get_prefetch_memory_usage(&self) -> usize {
687 self.prefetch_buffer_usage.load(Ordering::Acquire)
688 }
689
690 pub async fn get_stream_for_blocks(
691 &self,
692 object_id: HummockSstableObjectId,
693 metas: &[BlockMeta],
694 ) -> HummockResult<BlockDataStream> {
695 fail_point!("get_stream_err");
696 let data_path = self.get_sst_data_path(object_id);
697 let store = self.store().clone();
698 let block_meta = &metas[0];
699 let start_pos = block_meta.offset as usize;
700 let end_pos = metas.iter().map(|meta| meta.len as usize).sum::<usize>() + start_pos;
701 let range = start_pos..end_pos;
702 let ret = tokio::spawn(async move { store.streaming_read(&data_path, range).await }).await;
704
705 let reader = match ret {
706 Ok(Ok(reader)) => reader,
707 Ok(Err(e)) => return Err(HummockError::from(e)),
708 Err(e) => {
709 return Err(HummockError::other(format!(
710 "failed to get result, this read request may be canceled: {}",
711 e.as_report()
712 )));
713 }
714 };
715 Ok(BlockDataStream::new(reader, metas.to_vec()))
716 }
717
718 pub fn data_recent_filter(
719 &self,
720 ) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
721 self.recent_filter.as_ref()
722 }
723
724 pub fn meta_cache(&self) -> &HybridCache<HummockSstableObjectId, Box<Sstable>> {
725 &self.meta_cache
726 }
727
728 pub fn block_cache(&self) -> &HybridCache<SstableBlockIndex, Box<Block>> {
729 &self.block_cache
730 }
731
732 pub fn recent_filter(&self) -> Option<&Arc<RecentFilter<(HummockSstableObjectId, usize)>>> {
733 self.recent_filter.as_ref()
734 }
735
736 pub async fn create_streaming_uploader(
737 &self,
738 path: &str,
739 ) -> ObjectResult<ObjectStreamingUploader> {
740 self.store.streaming_upload(path).await
741 }
742}
743
744pub type SstableStoreRef = Arc<SstableStore>;
745#[cfg(test)]
746mod tests {
747 use std::ops::Range;
748 use std::sync::Arc;
749
750 use risingwave_hummock_sdk::HummockObjectId;
751 use risingwave_hummock_sdk::sstable_info::SstableInfo;
752
753 use super::{SstableStoreRef, SstableWriterOptions};
754 use crate::hummock::iterator::HummockIterator;
755 use crate::hummock::iterator::test_utils::{iterator_test_key_of, mock_sstable_store};
756 use crate::hummock::sstable::SstableIteratorReadOptions;
757 use crate::hummock::test_utils::{
758 default_builder_opt_for_test, gen_test_sstable_data, put_sst,
759 };
760 use crate::hummock::value::HummockValue;
761 use crate::hummock::{CachePolicy, SstableIterator, SstableMeta, SstableStore};
762 use crate::monitor::StoreLocalStatistic;
763
764 const SST_ID: u64 = 1;
765
766 fn get_hummock_value(x: usize) -> HummockValue<Vec<u8>> {
767 HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec())
768 }
769
770 async fn validate_sst(
771 sstable_store: SstableStoreRef,
772 info: &SstableInfo,
773 mut meta: SstableMeta,
774 x_range: Range<usize>,
775 ) {
776 let mut stats = StoreLocalStatistic::default();
777 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
778 std::mem::take(&mut meta.bloom_filter);
779 assert_eq!(holder.meta, meta);
780 let holder = sstable_store.sstable(info, &mut stats).await.unwrap();
781 assert_eq!(holder.meta, meta);
782 let mut iter = SstableIterator::new(
783 holder,
784 sstable_store,
785 Arc::new(SstableIteratorReadOptions::default()),
786 info,
787 );
788 iter.rewind().await.unwrap();
789 for i in x_range {
790 let key = iter.key();
791 let value = iter.value();
792 assert_eq!(key, iterator_test_key_of(i).to_ref());
793 assert_eq!(value, get_hummock_value(i).as_slice());
794 iter.next().await.unwrap();
795 }
796 }
797
798 #[tokio::test]
799 async fn test_batch_upload() {
800 let sstable_store = mock_sstable_store().await;
801 let x_range = 0..100;
802 let (data, meta) = gen_test_sstable_data(
803 default_builder_opt_for_test(),
804 x_range
805 .clone()
806 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
807 )
808 .await;
809 let writer_opts = SstableWriterOptions {
810 capacity_hint: None,
811 tracker: None,
812 policy: CachePolicy::Disable,
813 };
814 let info = put_sst(
815 SST_ID,
816 data.clone(),
817 meta.clone(),
818 sstable_store.clone(),
819 writer_opts,
820 vec![0],
821 )
822 .await
823 .unwrap();
824
825 validate_sst(sstable_store, &info, meta, x_range).await;
826 }
827
828 #[tokio::test]
829 async fn test_streaming_upload() {
830 let sstable_store = mock_sstable_store().await;
832 let x_range = 0..100;
833 let (data, meta) = gen_test_sstable_data(
834 default_builder_opt_for_test(),
835 x_range
836 .clone()
837 .map(|x| (iterator_test_key_of(x), get_hummock_value(x))),
838 )
839 .await;
840 let writer_opts = SstableWriterOptions {
841 capacity_hint: None,
842 tracker: None,
843 policy: CachePolicy::Disable,
844 };
845 let info = put_sst(
846 SST_ID,
847 data.clone(),
848 meta.clone(),
849 sstable_store.clone(),
850 writer_opts,
851 vec![0],
852 )
853 .await
854 .unwrap();
855
856 validate_sst(sstable_store, &info, meta, x_range).await;
857 }
858
859 #[tokio::test]
860 async fn test_basic() {
861 let sstable_store = mock_sstable_store().await;
862 let object_id = 123;
863 let data_path = sstable_store.get_sst_data_path(object_id);
864 assert_eq!(data_path, "test/123.data");
865 assert_eq!(
866 SstableStore::get_object_id_from_path(&data_path),
867 HummockObjectId::Sstable(object_id.into())
868 );
869 }
870}