1use std::cmp::min;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use foyer::{
21 Hint, HybridCache, HybridCacheBuilder, StorageKey as HybridKey, StorageValue as HybridValue,
22};
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::TableId;
26use risingwave_common::config::EvictionConfig;
27use risingwave_common::hash::VirtualNode;
28use risingwave_common::util::epoch::test_epoch;
29use risingwave_common::util::row_serde::OrderedRowSerde;
30use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
31use risingwave_hummock_sdk::key_range::KeyRange;
32use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
33use risingwave_hummock_sdk::{
34 EpochWithGap, HummockEpoch, HummockReadEpoch, HummockSstableObjectId,
35};
36
37use super::iterator::test_utils::iterator_test_table_key_of;
38use super::{
39 DEFAULT_RESTART_INTERVAL, HummockResult, InMemWriter, SstableMeta, SstableWriterOptions,
40};
41use crate::StateStore;
42use crate::compaction_catalog_manager::{
43 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
44};
45use crate::error::StorageResult;
46use crate::hummock::shared_buffer::shared_buffer_batch::{
47 SharedBufferBatch, SharedBufferItem, SharedBufferValue,
48};
49use crate::hummock::value::HummockValue;
50use crate::hummock::{
51 BlockedXor16FilterBuilder, CachePolicy, FilterBuilder, LruCache, Sstable, SstableBuilder,
52 SstableBuilderOptions, SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder,
53};
54use crate::monitor::StoreLocalStatistic;
55use crate::opts::StorageOpts;
56use crate::storage_value::StorageValue;
57use crate::store::*;
58
59pub fn default_opts_for_test() -> StorageOpts {
60 StorageOpts {
61 sstable_size_mb: 4,
62 block_size_kb: 64,
63 bloom_false_positive: 0.1,
64 share_buffers_sync_parallelism: 2,
65 share_buffer_compaction_worker_threads_number: 1,
66 shared_buffer_capacity_mb: 64,
67 data_directory: "hummock_001".to_owned(),
68 write_conflict_detection_enabled: true,
69 block_cache_capacity_mb: 64,
70 meta_cache_capacity_mb: 64,
71 block_cache_eviction_config: EvictionConfig::for_test(),
72 disable_remote_compactor: false,
73 share_buffer_upload_concurrency: 1,
74 compactor_memory_limit_mb: 64,
75 sstable_id_remote_fetch_number: 1,
76 vector_file_block_size_kb: 8,
77 ..Default::default()
78 }
79}
80
81pub fn gen_dummy_batch(n: u64) -> Vec<SharedBufferItem> {
82 vec![(
83 TableKey(Bytes::from(iterator_test_table_key_of(n as usize))),
84 SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])),
85 )]
86}
87
88pub fn gen_dummy_batch_several_keys(n: usize) -> Vec<(TableKey<Bytes>, StorageValue)> {
89 let mut kvs = vec![];
90 let v = Bytes::from(b"value1".to_vec().repeat(100));
91 for idx in 0..n {
92 kvs.push((
93 TableKey(Bytes::from(iterator_test_table_key_of(idx))),
94 StorageValue::new_put(v.clone()),
95 ));
96 }
97 kvs
98}
99
100pub fn gen_dummy_sst_info(
101 id: u64,
102 batches: Vec<SharedBufferBatch>,
103 table_id: TableId,
104 epoch: HummockEpoch,
105) -> SstableInfo {
106 let mut min_table_key: Vec<u8> = batches[0].start_table_key().to_vec();
107 let mut max_table_key: Vec<u8> = batches[0].end_table_key().to_vec();
108 let mut file_size = 0;
109 for batch in batches.iter().skip(1) {
110 if min_table_key.as_slice() > *batch.start_table_key() {
111 min_table_key = batch.start_table_key().to_vec();
112 }
113 if max_table_key.as_slice() < *batch.end_table_key() {
114 max_table_key = batch.end_table_key().to_vec();
115 }
116 file_size += batch.size() as u64;
117 }
118 SstableInfoInner {
119 object_id: id.into(),
120 sst_id: id.into(),
121 key_range: KeyRange {
122 left: Bytes::from(FullKey::for_test(table_id, min_table_key, epoch).encode()),
123 right: Bytes::from(FullKey::for_test(table_id, max_table_key, epoch).encode()),
124 right_exclusive: false,
125 },
126 file_size,
127 table_ids: vec![table_id],
128 uncompressed_file_size: file_size,
129 min_epoch: epoch,
130 max_epoch: epoch,
131 sst_size: file_size,
132 ..Default::default()
133 }
134 .into()
135}
136
137pub const TEST_KEYS_COUNT: usize = 10000;
139
140pub fn default_builder_opt_for_test() -> SstableBuilderOptions {
141 SstableBuilderOptions {
142 capacity: 256 * (1 << 20), block_capacity: 4096, restart_interval: DEFAULT_RESTART_INTERVAL,
145 bloom_false_positive: 0.1,
146 ..Default::default()
147 }
148}
149
150pub fn default_writer_opt_for_test() -> SstableWriterOptions {
151 SstableWriterOptions {
152 capacity_hint: None,
153 tracker: None,
154 policy: CachePolicy::Disable,
155 }
156}
157
158pub fn mock_sst_writer(opt: &SstableBuilderOptions) -> InMemWriter {
159 InMemWriter::from(opt)
160}
161
162pub async fn gen_test_sstable_data(
164 opts: SstableBuilderOptions,
165 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
166) -> (Bytes, SstableMeta) {
167 let table_id_to_vnode = HashMap::from_iter(vec![(
168 TableId::default().as_raw_id(),
169 VirtualNode::COUNT_FOR_TEST,
170 )]);
171 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
172 let mut b = SstableBuilder::for_test(
173 0,
174 mock_sst_writer(&opts),
175 opts,
176 table_id_to_vnode,
177 table_id_to_watermark_serde,
178 );
179 for (key, value) in kv_iter {
180 b.add_for_test(key.to_ref(), value.as_slice())
181 .await
182 .unwrap();
183 }
184 let output = b.finish().await.unwrap();
185 output.writer_output
186}
187
188pub async fn put_sst(
190 sst_object_id: u64,
191 data: Bytes,
192 mut meta: SstableMeta,
193 sstable_store: SstableStoreRef,
194 mut options: SstableWriterOptions,
195 table_ids: Vec<u32>,
196) -> HummockResult<SstableInfo> {
197 options.policy = CachePolicy::NotFill;
198 let mut writer = sstable_store
199 .clone()
200 .create_sst_writer(sst_object_id, options);
201 for block_meta in &meta.block_metas {
202 let offset = block_meta.offset as usize;
203 let end_offset = offset + block_meta.len as usize;
204 writer
205 .write_block(&data[offset..end_offset], block_meta)
206 .await?;
207 }
208
209 let bloom_filter = {
211 let mut filter_builder = BlockedXor16FilterBuilder::new(100);
212 for _ in &meta.block_metas {
213 filter_builder.switch_block(None);
214 }
215
216 filter_builder.finish(None)
217 };
218
219 meta.meta_offset = writer.data_len() as u64;
220 meta.bloom_filter = bloom_filter;
221 let sst = SstableInfoInner {
222 object_id: sst_object_id.into(),
223 sst_id: sst_object_id.into(),
224 key_range: KeyRange {
225 left: Bytes::from(meta.smallest_key.clone()),
226 right: Bytes::from(meta.largest_key.clone()),
227 right_exclusive: false,
228 },
229 file_size: meta.estimated_size as u64,
230 meta_offset: meta.meta_offset,
231 uncompressed_file_size: meta.estimated_size as u64,
232 table_ids: table_ids.into_iter().map(Into::into).collect(),
233 ..Default::default()
234 }
235 .into();
236 let writer_output = writer.finish(meta).await?;
237 writer_output.await.unwrap()?;
238 Ok(sst)
239}
240
241pub async fn gen_test_sstable_impl<B: AsRef<[u8]> + Clone + Default + Eq, F: FilterBuilder>(
243 opts: SstableBuilderOptions,
244 object_id: u64,
245 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
246 sstable_store: SstableStoreRef,
247 policy: CachePolicy,
248 table_id_to_vnode: HashMap<u32, usize>,
249 table_id_to_watermark_serde: HashMap<u32, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
250) -> SstableInfo {
251 let writer_opts = SstableWriterOptions {
252 capacity_hint: None,
253 tracker: None,
254 policy,
255 };
256 let writer = sstable_store
257 .clone()
258 .create_sst_writer(object_id, writer_opts);
259
260 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
261 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
262 table_id_to_vnode
263 .into_iter()
264 .map(|(table_id, v)| (table_id.into(), v))
265 .collect(),
266 table_id_to_watermark_serde
267 .into_iter()
268 .map(|(table_id, v)| (table_id.into(), v))
269 .collect(),
270 HashMap::default(),
271 ));
272
273 let mut b = SstableBuilder::<_, F>::new(
274 object_id,
275 writer,
276 F::create(opts.bloom_false_positive, opts.capacity / 16),
277 opts,
278 compaction_catalog_agent_ref,
279 None,
280 );
281
282 let mut last_key = FullKey::<B>::default();
283 for (key, value) in kv_iter {
284 let is_new_user_key =
285 last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref();
286 if is_new_user_key {
287 last_key = key.clone();
288 }
289
290 b.add(key.to_ref(), value.as_slice()).await.unwrap();
291 }
292 let output = b.finish().await.unwrap();
293 output.writer_output.await.unwrap().unwrap();
294 output.sst_info.sst_info
295}
296
297pub async fn gen_test_sstable<B: AsRef<[u8]> + Clone + Default + Eq>(
299 opts: SstableBuilderOptions,
300 object_id: u64,
301 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
302 sstable_store: SstableStoreRef,
303) -> (TableHolder, SstableInfo) {
304 let table_id_to_vnode = HashMap::from_iter(vec![(
305 TableId::default().as_raw_id(),
306 VirtualNode::COUNT_FOR_TEST,
307 )]);
308
309 let table_id_to_watermark_serde =
310 HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
311
312 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
313 opts,
314 object_id,
315 kv_iter,
316 sstable_store.clone(),
317 CachePolicy::NotFill,
318 table_id_to_vnode,
319 table_id_to_watermark_serde,
320 )
321 .await;
322
323 (
324 sstable_store
325 .sstable(&sst_info, &mut StoreLocalStatistic::default())
326 .await
327 .unwrap(),
328 sst_info,
329 )
330}
331
332pub async fn gen_test_sstable_with_table_ids<B: AsRef<[u8]> + Clone + Default + Eq>(
333 opts: SstableBuilderOptions,
334 object_id: u64,
335 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
336 sstable_store: SstableStoreRef,
337 table_ids: Vec<u32>,
338) -> (TableHolder, SstableInfo) {
339 let table_id_to_vnode = table_ids
340 .iter()
341 .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST))
342 .collect();
343 let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect();
344
345 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
346 opts,
347 object_id,
348 kv_iter,
349 sstable_store.clone(),
350 CachePolicy::NotFill,
351 table_id_to_vnode,
352 table_id_to_watermark_serde,
353 )
354 .await;
355
356 (
357 sstable_store
358 .sstable(&sst_info, &mut StoreLocalStatistic::default())
359 .await
360 .unwrap(),
361 sst_info,
362 )
363}
364
365pub async fn gen_test_sstable_info<B: AsRef<[u8]> + Clone + Default + Eq>(
367 opts: SstableBuilderOptions,
368 object_id: u64,
369 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
370 sstable_store: SstableStoreRef,
371) -> SstableInfo {
372 let table_id_to_vnode = HashMap::from_iter(vec![(
373 TableId::default().as_raw_id(),
374 VirtualNode::COUNT_FOR_TEST,
375 )]);
376
377 let table_id_to_watermark_serde =
378 HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
379
380 gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
381 opts,
382 object_id,
383 kv_iter,
384 sstable_store,
385 CachePolicy::NotFill,
386 table_id_to_vnode,
387 table_id_to_watermark_serde,
388 )
389 .await
390}
391
392pub async fn gen_test_sstable_with_range_tombstone(
394 opts: SstableBuilderOptions,
395 object_id: u64,
396 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
397 sstable_store: SstableStoreRef,
398) -> SstableInfo {
399 let table_id_to_vnode = HashMap::from_iter(vec![(
400 TableId::default().as_raw_id(),
401 VirtualNode::COUNT_FOR_TEST,
402 )]);
403
404 let table_id_to_watermark_serde =
405 HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
406
407 gen_test_sstable_impl::<_, Xor16FilterBuilder>(
408 opts,
409 object_id,
410 kv_iter,
411 sstable_store.clone(),
412 CachePolicy::Fill(Hint::Normal),
413 table_id_to_vnode,
414 table_id_to_watermark_serde,
415 )
416 .await
417}
418
419pub fn test_user_key(table_key: impl AsRef<[u8]>) -> UserKey<Vec<u8>> {
421 UserKey::for_test(TableId::default(), table_key.as_ref().to_vec())
422}
423
424pub fn test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
426 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
427 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
428 UserKey::for_test(TableId::default(), table_key)
429}
430
431pub fn test_key_of(idx: usize) -> FullKey<Vec<u8>> {
433 FullKey {
434 user_key: test_user_key_of(idx),
435 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
436 }
437}
438
439pub fn test_value_of(idx: usize) -> Vec<u8> {
441 "23332333"
442 .as_bytes()
443 .iter()
444 .cycle()
445 .cloned()
446 .take(idx % 100 + 1) .collect_vec()
448}
449
450pub async fn gen_default_test_sstable(
454 opts: SstableBuilderOptions,
455 object_id: u64,
456 sstable_store: SstableStoreRef,
457) -> (TableHolder, SstableInfo) {
458 gen_test_sstable(
459 opts,
460 object_id,
461 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
462 sstable_store,
463 )
464 .await
465}
466
467pub async fn count_stream(mut i: impl StateStoreIter) -> usize {
468 let mut c: usize = 0;
469 while i.try_next().await.unwrap().is_some() {
470 c += 1
471 }
472 c
473}
474
475pub fn create_small_table_cache() -> Arc<LruCache<HummockSstableObjectId, Box<Sstable>>> {
476 Arc::new(LruCache::new(1, 4, 0))
477}
478
479pub async fn hybrid_cache_for_test<K, V>() -> HybridCache<K, V>
480where
481 K: HybridKey,
482 V: HybridValue,
483{
484 HybridCacheBuilder::new()
485 .memory(10)
486 .storage()
487 .build()
488 .await
489 .unwrap()
490}
491
492#[derive(Default, Clone)]
493pub struct StateStoreTestReadOptions {
494 pub table_id: TableId,
495 pub prefix_hint: Option<Bytes>,
496 pub prefetch_options: PrefetchOptions,
497 pub cache_policy: CachePolicy,
498 pub read_committed: bool,
499 pub retention_seconds: Option<u32>,
500 pub read_version_from_backup: bool,
501}
502
503impl StateStoreTestReadOptions {
504 fn get_read_epoch(&self, epoch: u64) -> HummockReadEpoch {
505 if self.read_version_from_backup {
506 HummockReadEpoch::Backup(epoch)
507 } else if self.read_committed {
508 HummockReadEpoch::Committed(epoch)
509 } else {
510 HummockReadEpoch::NoWait(epoch)
511 }
512 }
513}
514
515pub type ReadOptions = StateStoreTestReadOptions;
516
517impl From<StateStoreTestReadOptions> for crate::store::ReadOptions {
518 fn from(val: StateStoreTestReadOptions) -> crate::store::ReadOptions {
519 crate::store::ReadOptions {
520 prefix_hint: val.prefix_hint,
521 prefetch_options: val.prefetch_options,
522 cache_policy: val.cache_policy,
523 retention_seconds: val.retention_seconds,
524 }
525 }
526}
527
528pub trait StateStoreReadTestExt: StateStore {
529 fn get_keyed_row(
533 &self,
534 key: TableKey<Bytes>,
535 epoch: u64,
536 read_options: ReadOptions,
537 ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
538
539 fn get(
543 &self,
544 key: TableKey<Bytes>,
545 epoch: u64,
546 read_options: ReadOptions,
547 ) -> impl StorageFuture<'_, Option<Bytes>> {
548 self.get_keyed_row(key, epoch, read_options)
549 .map_ok(|v| v.map(|(_, v)| v))
550 }
551
552 fn iter(
558 &self,
559 key_range: TableKeyRange,
560 epoch: u64,
561 read_options: ReadOptions,
562 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter>;
563
564 fn rev_iter(
565 &self,
566 key_range: TableKeyRange,
567 epoch: u64,
568 read_options: ReadOptions,
569 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter>;
570
571 fn scan(
572 &self,
573 key_range: TableKeyRange,
574 epoch: u64,
575 limit: Option<usize>,
576 read_options: ReadOptions,
577 ) -> impl StorageFuture<'_, Vec<StateStoreKeyedRow>>;
578}
579
580impl<S: StateStore> StateStoreReadTestExt for S {
581 async fn get_keyed_row(
582 &self,
583 key: TableKey<Bytes>,
584 epoch: u64,
585 read_options: ReadOptions,
586 ) -> StorageResult<Option<StateStoreKeyedRow>> {
587 let snapshot = self
588 .new_read_snapshot(
589 read_options.get_read_epoch(epoch),
590 NewReadSnapshotOptions {
591 table_id: read_options.table_id,
592 },
593 )
594 .await?;
595 snapshot
596 .on_key_value(key, read_options.into(), |key, value| {
597 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
598 })
599 .await
600 }
601
602 async fn iter(
603 &self,
604 key_range: TableKeyRange,
605 epoch: u64,
606 read_options: ReadOptions,
607 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter> {
608 let snapshot = self
609 .new_read_snapshot(
610 read_options.get_read_epoch(epoch),
611 NewReadSnapshotOptions {
612 table_id: read_options.table_id,
613 },
614 )
615 .await?;
616 snapshot.iter(key_range, read_options.into()).await
617 }
618
619 async fn rev_iter(
620 &self,
621 key_range: TableKeyRange,
622 epoch: u64,
623 read_options: ReadOptions,
624 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter> {
625 let snapshot = self
626 .new_read_snapshot(
627 read_options.get_read_epoch(epoch),
628 NewReadSnapshotOptions {
629 table_id: read_options.table_id,
630 },
631 )
632 .await?;
633 snapshot.rev_iter(key_range, read_options.into()).await
634 }
635
636 async fn scan(
637 &self,
638 key_range: TableKeyRange,
639 epoch: u64,
640 limit: Option<usize>,
641 read_options: ReadOptions,
642 ) -> StorageResult<Vec<StateStoreKeyedRow>> {
643 const MAX_INITIAL_CAP: usize = 1024;
644 let limit = limit.unwrap_or(usize::MAX);
645 let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
646 let mut iter = self.iter(key_range, epoch, read_options).await?;
647 while let Some((key, value)) = iter.try_next().await? {
648 ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
649 }
650 Ok(ret)
651 }
652}
653
654pub trait StateStoreGetTestExt: StateStoreGet {
655 fn get(
656 &self,
657 key: TableKey<Bytes>,
658 read_options: ReadOptions,
659 ) -> impl StorageFuture<'_, Option<Bytes>>;
660}
661
662impl<S: StateStoreGet> StateStoreGetTestExt for S {
663 async fn get(
664 &self,
665 key: TableKey<Bytes>,
666 read_options: ReadOptions,
667 ) -> StorageResult<Option<Bytes>> {
668 self.on_key_value(key, read_options.into(), |_, value| {
669 Ok(Bytes::copy_from_slice(value))
670 })
671 .await
672 }
673}