1use std::cmp::min;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use foyer::{
21 Hint, HybridCache, HybridCacheBuilder, StorageKey as HybridKey, StorageValue as HybridValue,
22};
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::{TableId, TableOption};
26use risingwave_common::config::EvictionConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::util::epoch::test_epoch;
29use risingwave_common::util::row_serde::OrderedRowSerde;
30use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
31use risingwave_hummock_sdk::key_range::KeyRange;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{
34 EpochWithGap, HummockEpoch, HummockReadEpoch, HummockSstableObjectId,
35};
36
37use super::iterator::test_utils::iterator_test_table_key_of;
38use super::{
39 DEFAULT_RESTART_INTERVAL, HummockResult, InMemWriter, SstableMeta, SstableWriterOptions,
40};
41use crate::StateStore;
42use crate::compaction_catalog_manager::{
43 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
44};
45use crate::error::StorageResult;
46use crate::hummock::shared_buffer::shared_buffer_batch::{
47 SharedBufferBatch, SharedBufferItem, SharedBufferValue,
48};
49use crate::hummock::value::HummockValue;
50use crate::hummock::{
51 BlockedXor16FilterBuilder, CachePolicy, DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
52 FilterBuilder, FilterBuilderOptions, LruCache, Sstable, SstableBuilder, SstableBuilderOptions,
53 SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder,
54};
55use crate::monitor::StoreLocalStatistic;
56use crate::opts::StorageOpts;
57use crate::storage_value::StorageValue;
58use crate::store::*;
59
60pub fn default_opts_for_test() -> StorageOpts {
61 StorageOpts {
62 sstable_size_mb: 4,
63 block_size_kb: 64,
64 bloom_false_positive: 0.1,
65 share_buffers_sync_parallelism: 2,
66 share_buffer_compaction_worker_threads_number: 1,
67 shared_buffer_capacity_mb: 64,
68 data_directory: "hummock_001".to_owned(),
69 write_conflict_detection_enabled: true,
70 block_cache_capacity_mb: 64,
71 meta_cache_capacity_mb: 64,
72 block_cache_eviction_config: EvictionConfig::for_test(),
73 disable_remote_compactor: false,
74 share_buffer_upload_concurrency: 1,
75 compactor_memory_limit_mb: 64,
76 sstable_id_remote_fetch_number: 1,
77 vector_file_block_size_kb: 8,
78 ..Default::default()
79 }
80}
81
82pub fn gen_dummy_batch(n: u64) -> Vec<SharedBufferItem> {
83 vec![(
84 TableKey(Bytes::from(iterator_test_table_key_of(n as usize))),
85 SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])),
86 )]
87}
88
89pub fn gen_dummy_batch_several_keys(n: usize) -> Vec<(TableKey<Bytes>, StorageValue)> {
90 let mut kvs = vec![];
91 let v = Bytes::from(b"value1".to_vec().repeat(100));
92 for idx in 0..n {
93 kvs.push((
94 TableKey(Bytes::from(iterator_test_table_key_of(idx))),
95 StorageValue::new_put(v.clone()),
96 ));
97 }
98 kvs
99}
100
101pub fn gen_dummy_sst_info(
102 id: u64,
103 batches: Vec<SharedBufferBatch>,
104 table_id: TableId,
105 epoch: HummockEpoch,
106) -> SstableInfo {
107 let mut min_table_key: Vec<u8> = batches[0].start_table_key().to_vec();
108 let mut max_table_key: Vec<u8> = batches[0].end_table_key().to_vec();
109 let mut file_size = 0;
110 for batch in batches.iter().skip(1) {
111 if min_table_key.as_slice() > *batch.start_table_key() {
112 min_table_key = batch.start_table_key().to_vec();
113 }
114 if max_table_key.as_slice() < *batch.end_table_key() {
115 max_table_key = batch.end_table_key().to_vec();
116 }
117 file_size += batch.size() as u64;
118 }
119 SstableInfoInner {
120 object_id: id.into(),
121 sst_id: id.into(),
122 key_range: KeyRange {
123 left: Bytes::from(FullKey::for_test(table_id, min_table_key, epoch).encode()),
124 right: Bytes::from(FullKey::for_test(table_id, max_table_key, epoch).encode()),
125 right_exclusive: false,
126 },
127 file_size,
128 table_ids: vec![table_id],
129 uncompressed_file_size: file_size,
130 min_epoch: epoch,
131 max_epoch: epoch,
132 sst_size: file_size,
133 ..Default::default()
134 }
135 .into()
136}
137
138pub const TEST_KEYS_COUNT: usize = 10000;
140
141pub fn default_builder_opt_for_test() -> SstableBuilderOptions {
142 SstableBuilderOptions {
143 capacity: 256 * (1 << 20), block_capacity: 4096, restart_interval: DEFAULT_RESTART_INTERVAL,
146 bloom_false_positive: 0.1,
147 ..Default::default()
148 }
149}
150
151pub fn default_writer_opt_for_test() -> SstableWriterOptions {
152 SstableWriterOptions {
153 capacity_hint: None,
154 tracker: None,
155 policy: CachePolicy::Disable,
156 }
157}
158
159pub fn mock_sst_writer(opt: &SstableBuilderOptions) -> InMemWriter {
160 InMemWriter::from(opt)
161}
162
163pub async fn gen_test_sstable_data(
165 opts: SstableBuilderOptions,
166 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
167) -> (Bytes, SstableMeta) {
168 let table_id_to_vnode = HashMap::from_iter(vec![(
169 TableId::default().as_raw_id(),
170 VirtualNode::COUNT_FOR_TEST,
171 )]);
172 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
173 let mut b = SstableBuilder::for_test(
174 0,
175 mock_sst_writer(&opts),
176 opts,
177 table_id_to_vnode,
178 table_id_to_watermark_serde,
179 );
180 for (key, value) in kv_iter {
181 b.add_for_test(key.to_ref(), value.as_slice())
182 .await
183 .unwrap();
184 }
185 let output = b.finish().await.unwrap();
186 output.writer_output
187}
188
189pub async fn put_sst(
191 sst_object_id: u64,
192 data: Bytes,
193 mut meta: SstableMeta,
194 sstable_store: SstableStoreRef,
195 mut options: SstableWriterOptions,
196 table_ids: Vec<u32>,
197) -> HummockResult<SstableInfo> {
198 options.policy = CachePolicy::NotFill;
199 let mut writer = sstable_store
200 .clone()
201 .create_sst_writer(sst_object_id, options);
202 for block_meta in &meta.block_metas {
203 let offset = block_meta.offset as usize;
204 let end_offset = offset + block_meta.len as usize;
205 writer
206 .write_block(&data[offset..end_offset], block_meta)
207 .await?;
208 }
209
210 let bloom_filter = {
212 let mut filter_builder = BlockedXor16FilterBuilder::create(FilterBuilderOptions {
213 estimated_key_count: 0,
214 estimated_block_count: meta.block_metas.len(),
215 hash_prealloc_key_count_cap: DEFAULT_FILTER_HASH_PREALLOC_KEY_COUNT_CAP,
216 });
217 for _ in &meta.block_metas {
218 filter_builder.switch_block(None);
219 }
220
221 filter_builder.finish(None)
222 };
223
224 meta.meta_offset = writer.data_len() as u64;
225 meta.bloom_filter = bloom_filter;
226 let sst = SstableInfoInner {
227 object_id: sst_object_id.into(),
228 sst_id: sst_object_id.into(),
229 key_range: KeyRange {
230 left: Bytes::from(meta.smallest_key.clone()),
231 right: Bytes::from(meta.largest_key.clone()),
232 right_exclusive: false,
233 },
234 file_size: meta.estimated_size as u64,
235 meta_offset: meta.meta_offset,
236 uncompressed_file_size: meta.estimated_size as u64,
237 table_ids: table_ids.into_iter().map(Into::into).collect(),
238 ..Default::default()
239 }
240 .into();
241 let writer_output = writer.finish(meta).await?;
242 writer_output.await.unwrap()?;
243 Ok(sst)
244}
245
246pub async fn gen_test_sstable_impl<B: AsRef<[u8]> + Clone + Default + Eq, F: FilterBuilder>(
248 opts: SstableBuilderOptions,
249 object_id: u64,
250 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
251 sstable_store: SstableStoreRef,
252 policy: CachePolicy,
253 table_id_to_vnode: HashMap<u32, usize>,
254 table_id_to_watermark_serde: HashMap<u32, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
255) -> SstableInfo {
256 let writer_opts = SstableWriterOptions {
257 capacity_hint: None,
258 tracker: None,
259 policy,
260 };
261 let writer = sstable_store
262 .clone()
263 .create_sst_writer(object_id, writer_opts);
264
265 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
266 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
267 table_id_to_vnode
268 .into_iter()
269 .map(|(table_id, v)| (table_id.into(), v))
270 .collect(),
271 table_id_to_watermark_serde
272 .into_iter()
273 .map(|(table_id, v)| (table_id.into(), v))
274 .collect(),
275 HashMap::default(),
276 ));
277
278 let mut b = SstableBuilder::<_, F>::new(
279 object_id,
280 writer,
281 F::create(opts.filter_builder_options()),
282 opts,
283 compaction_catalog_agent_ref,
284 None,
285 );
286
287 let mut last_key = FullKey::<B>::default();
288 for (key, value) in kv_iter {
289 let is_new_user_key =
290 last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref();
291 if is_new_user_key {
292 last_key = key.clone();
293 }
294
295 b.add(key.to_ref(), value.as_slice()).await.unwrap();
296 }
297 let output = b.finish().await.unwrap();
298 output.writer_output.await.unwrap().unwrap();
299 output.sst_info.sst_info
300}
301
302pub async fn gen_test_sstable<B: AsRef<[u8]> + Clone + Default + Eq>(
304 opts: SstableBuilderOptions,
305 object_id: u64,
306 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
307 sstable_store: SstableStoreRef,
308) -> (TableHolder, SstableInfo) {
309 let table_id_to_vnode = HashMap::from_iter(vec![(
310 TableId::default().as_raw_id(),
311 VirtualNode::COUNT_FOR_TEST,
312 )]);
313
314 let table_id_to_watermark_serde =
315 HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
316
317 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
318 opts,
319 object_id,
320 kv_iter,
321 sstable_store.clone(),
322 CachePolicy::NotFill,
323 table_id_to_vnode,
324 table_id_to_watermark_serde,
325 )
326 .await;
327
328 (
329 sstable_store
330 .sstable(&sst_info, &mut StoreLocalStatistic::default())
331 .await
332 .unwrap(),
333 sst_info,
334 )
335}
336
337pub async fn gen_test_sstable_with_table_ids<B: AsRef<[u8]> + Clone + Default + Eq>(
338 opts: SstableBuilderOptions,
339 object_id: u64,
340 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
341 sstable_store: SstableStoreRef,
342 table_ids: Vec<u32>,
343) -> (TableHolder, SstableInfo) {
344 let table_id_to_vnode = table_ids
345 .iter()
346 .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST))
347 .collect();
348 let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect();
349
350 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
351 opts,
352 object_id,
353 kv_iter,
354 sstable_store.clone(),
355 CachePolicy::NotFill,
356 table_id_to_vnode,
357 table_id_to_watermark_serde,
358 )
359 .await;
360
361 (
362 sstable_store
363 .sstable(&sst_info, &mut StoreLocalStatistic::default())
364 .await
365 .unwrap(),
366 sst_info,
367 )
368}
369
370pub async fn gen_test_sstable_info<B: AsRef<[u8]> + Clone + Default + Eq>(
372 opts: SstableBuilderOptions,
373 object_id: u64,
374 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
375 sstable_store: SstableStoreRef,
376) -> SstableInfo {
377 let table_id_to_vnode = HashMap::from_iter(vec![(
378 TableId::default().as_raw_id(),
379 VirtualNode::COUNT_FOR_TEST,
380 )]);
381
382 let table_id_to_watermark_serde =
383 HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
384
385 gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
386 opts,
387 object_id,
388 kv_iter,
389 sstable_store,
390 CachePolicy::NotFill,
391 table_id_to_vnode,
392 table_id_to_watermark_serde,
393 )
394 .await
395}
396
397pub async fn gen_test_sstable_with_range_tombstone(
399 opts: SstableBuilderOptions,
400 object_id: u64,
401 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
402 sstable_store: SstableStoreRef,
403) -> SstableInfo {
404 let table_id_to_vnode = HashMap::from_iter(vec![(
405 TableId::default().as_raw_id(),
406 VirtualNode::COUNT_FOR_TEST,
407 )]);
408
409 let table_id_to_watermark_serde =
410 HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
411
412 gen_test_sstable_impl::<_, Xor16FilterBuilder>(
413 opts,
414 object_id,
415 kv_iter,
416 sstable_store.clone(),
417 CachePolicy::Fill(Hint::Normal),
418 table_id_to_vnode,
419 table_id_to_watermark_serde,
420 )
421 .await
422}
423
424pub fn test_user_key(table_key: impl AsRef<[u8]>) -> UserKey<Vec<u8>> {
426 UserKey::for_test(TableId::default(), table_key.as_ref().to_vec())
427}
428
429pub fn test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
431 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
432 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
433 UserKey::for_test(TableId::default(), table_key)
434}
435
436pub fn test_key_of(idx: usize) -> FullKey<Vec<u8>> {
438 FullKey {
439 user_key: test_user_key_of(idx),
440 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
441 }
442}
443
444pub fn test_value_of(idx: usize) -> Vec<u8> {
446 "23332333"
447 .as_bytes()
448 .iter()
449 .cycle()
450 .cloned()
451 .take(idx % 100 + 1) .collect_vec()
453}
454
455pub async fn gen_default_test_sstable(
459 opts: SstableBuilderOptions,
460 object_id: u64,
461 sstable_store: SstableStoreRef,
462) -> (TableHolder, SstableInfo) {
463 gen_test_sstable(
464 opts,
465 object_id,
466 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
467 sstable_store,
468 )
469 .await
470}
471
472pub async fn count_stream(mut i: impl StateStoreIter) -> usize {
473 let mut c: usize = 0;
474 while i.try_next().await.unwrap().is_some() {
475 c += 1
476 }
477 c
478}
479
480pub fn create_small_table_cache() -> Arc<LruCache<HummockSstableObjectId, Box<Sstable>>> {
481 Arc::new(LruCache::new(1, 4, 0))
482}
483
484pub async fn hybrid_cache_for_test<K, V>() -> HybridCache<K, V>
485where
486 K: HybridKey,
487 V: HybridValue,
488{
489 HybridCacheBuilder::new()
490 .memory(10)
491 .storage()
492 .build()
493 .await
494 .unwrap()
495}
496
497#[derive(Default, Clone)]
498pub struct StateStoreTestReadOptions {
499 pub table_id: TableId,
500 pub prefix_hint: Option<Bytes>,
501 pub prefetch_options: PrefetchOptions,
502 pub cache_policy: CachePolicy,
503 pub read_committed: bool,
504 pub retention_seconds: Option<u32>,
505 pub read_version_from_backup: bool,
506}
507
508impl StateStoreTestReadOptions {
509 fn get_read_epoch(&self, epoch: u64) -> HummockReadEpoch {
510 if self.read_version_from_backup {
511 HummockReadEpoch::Backup(epoch)
512 } else if self.read_committed {
513 HummockReadEpoch::Committed(epoch)
514 } else {
515 HummockReadEpoch::NoWait(epoch)
516 }
517 }
518}
519
520pub type ReadOptions = StateStoreTestReadOptions;
521
522impl From<StateStoreTestReadOptions> for crate::store::ReadOptions {
523 fn from(val: StateStoreTestReadOptions) -> crate::store::ReadOptions {
524 crate::store::ReadOptions {
525 prefix_hint: val.prefix_hint,
526 prefetch_options: val.prefetch_options,
527 cache_policy: val.cache_policy,
528 }
529 }
530}
531
532pub trait StateStoreReadTestExt: StateStore {
533 fn get_keyed_row(
537 &self,
538 key: TableKey<Bytes>,
539 epoch: u64,
540 read_options: ReadOptions,
541 ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
542
543 fn get(
547 &self,
548 key: TableKey<Bytes>,
549 epoch: u64,
550 read_options: ReadOptions,
551 ) -> impl StorageFuture<'_, Option<Bytes>> {
552 self.get_keyed_row(key, epoch, read_options)
553 .map_ok(|v| v.map(|(_, v)| v))
554 }
555
556 fn iter(
562 &self,
563 key_range: TableKeyRange,
564 epoch: u64,
565 read_options: ReadOptions,
566 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter>;
567
568 fn rev_iter(
569 &self,
570 key_range: TableKeyRange,
571 epoch: u64,
572 read_options: ReadOptions,
573 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter>;
574
575 fn scan(
576 &self,
577 key_range: TableKeyRange,
578 epoch: u64,
579 limit: Option<usize>,
580 read_options: ReadOptions,
581 ) -> impl StorageFuture<'_, Vec<StateStoreKeyedRow>>;
582}
583
584impl<S: StateStore> StateStoreReadTestExt for S {
585 async fn get_keyed_row(
586 &self,
587 key: TableKey<Bytes>,
588 epoch: u64,
589 read_options: ReadOptions,
590 ) -> StorageResult<Option<StateStoreKeyedRow>> {
591 let snapshot = self
592 .new_read_snapshot(
593 read_options.get_read_epoch(epoch),
594 NewReadSnapshotOptions {
595 table_id: read_options.table_id,
596 table_option: TableOption {
597 retention_seconds: read_options.retention_seconds,
598 },
599 },
600 )
601 .await?;
602 snapshot
603 .on_key_value(key, read_options.into(), |key, value| {
604 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
605 })
606 .await
607 }
608
609 async fn iter(
610 &self,
611 key_range: TableKeyRange,
612 epoch: u64,
613 read_options: ReadOptions,
614 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter> {
615 let snapshot = self
616 .new_read_snapshot(
617 read_options.get_read_epoch(epoch),
618 NewReadSnapshotOptions {
619 table_id: read_options.table_id,
620 table_option: TableOption {
621 retention_seconds: read_options.retention_seconds,
622 },
623 },
624 )
625 .await?;
626 snapshot.iter(key_range, read_options.into()).await
627 }
628
629 async fn rev_iter(
630 &self,
631 key_range: TableKeyRange,
632 epoch: u64,
633 read_options: ReadOptions,
634 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter> {
635 let snapshot = self
636 .new_read_snapshot(
637 read_options.get_read_epoch(epoch),
638 NewReadSnapshotOptions {
639 table_id: read_options.table_id,
640 table_option: TableOption {
641 retention_seconds: read_options.retention_seconds,
642 },
643 },
644 )
645 .await?;
646 snapshot.rev_iter(key_range, read_options.into()).await
647 }
648
649 async fn scan(
650 &self,
651 key_range: TableKeyRange,
652 epoch: u64,
653 limit: Option<usize>,
654 read_options: ReadOptions,
655 ) -> StorageResult<Vec<StateStoreKeyedRow>> {
656 const MAX_INITIAL_CAP: usize = 1024;
657 let limit = limit.unwrap_or(usize::MAX);
658 let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
659 let mut iter = self.iter(key_range, epoch, read_options).await?;
660 while let Some((key, value)) = iter.try_next().await? {
661 ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
662 }
663 Ok(ret)
664 }
665}
666
667pub trait StateStoreGetTestExt: StateStoreGet {
668 fn get(
669 &self,
670 key: TableKey<Bytes>,
671 read_options: ReadOptions,
672 ) -> impl StorageFuture<'_, Option<Bytes>>;
673}
674
675impl<S: StateStoreGet> StateStoreGetTestExt for S {
676 async fn get(
677 &self,
678 key: TableKey<Bytes>,
679 read_options: ReadOptions,
680 ) -> StorageResult<Option<Bytes>> {
681 self.on_key_value(key, read_options.into(), |_, value| {
682 Ok(Bytes::copy_from_slice(value))
683 })
684 .await
685 }
686}