1use std::cmp::min;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use foyer::{
21 Engine, Hint, HybridCache, HybridCacheBuilder, LargeEngineOptions, StorageKey as HybridKey,
22 StorageValue as HybridValue,
23};
24use futures::TryFutureExt;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_common::config::EvictionConfig;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::test_epoch;
30use risingwave_common::util::row_serde::OrderedRowSerde;
31use risingwave_hummock_sdk::compaction_group::StateTableId;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
33use risingwave_hummock_sdk::key_range::KeyRange;
34use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
35use risingwave_hummock_sdk::{
36 EpochWithGap, HummockEpoch, HummockReadEpoch, HummockSstableObjectId,
37};
38
39use super::iterator::test_utils::iterator_test_table_key_of;
40use super::{
41 DEFAULT_RESTART_INTERVAL, HummockResult, InMemWriter, SstableMeta, SstableWriterOptions,
42};
43use crate::StateStore;
44use crate::compaction_catalog_manager::{
45 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
46};
47use crate::error::StorageResult;
48use crate::hummock::shared_buffer::shared_buffer_batch::{
49 SharedBufferBatch, SharedBufferItem, SharedBufferValue,
50};
51use crate::hummock::value::HummockValue;
52use crate::hummock::{
53 BlockedXor16FilterBuilder, CachePolicy, FilterBuilder, LruCache, Sstable, SstableBuilder,
54 SstableBuilderOptions, SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder,
55};
56use crate::monitor::StoreLocalStatistic;
57use crate::opts::StorageOpts;
58use crate::storage_value::StorageValue;
59use crate::store::*;
60
61pub fn default_opts_for_test() -> StorageOpts {
62 StorageOpts {
63 sstable_size_mb: 4,
64 block_size_kb: 64,
65 bloom_false_positive: 0.1,
66 share_buffers_sync_parallelism: 2,
67 share_buffer_compaction_worker_threads_number: 1,
68 shared_buffer_capacity_mb: 64,
69 data_directory: "hummock_001".to_owned(),
70 write_conflict_detection_enabled: true,
71 block_cache_capacity_mb: 64,
72 meta_cache_capacity_mb: 64,
73 block_cache_eviction_config: EvictionConfig::for_test(),
74 disable_remote_compactor: false,
75 share_buffer_upload_concurrency: 1,
76 compactor_memory_limit_mb: 64,
77 sstable_id_remote_fetch_number: 1,
78 vector_file_block_size_kb: 8,
79 ..Default::default()
80 }
81}
82
83pub fn gen_dummy_batch(n: u64) -> Vec<SharedBufferItem> {
84 vec![(
85 TableKey(Bytes::from(iterator_test_table_key_of(n as usize))),
86 SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])),
87 )]
88}
89
90pub fn gen_dummy_batch_several_keys(n: usize) -> Vec<(TableKey<Bytes>, StorageValue)> {
91 let mut kvs = vec![];
92 let v = Bytes::from(b"value1".to_vec().repeat(100));
93 for idx in 0..n {
94 kvs.push((
95 TableKey(Bytes::from(iterator_test_table_key_of(idx))),
96 StorageValue::new_put(v.clone()),
97 ));
98 }
99 kvs
100}
101
102pub fn gen_dummy_sst_info(
103 id: u64,
104 batches: Vec<SharedBufferBatch>,
105 table_id: TableId,
106 epoch: HummockEpoch,
107) -> SstableInfo {
108 let mut min_table_key: Vec<u8> = batches[0].start_table_key().to_vec();
109 let mut max_table_key: Vec<u8> = batches[0].end_table_key().to_vec();
110 let mut file_size = 0;
111 for batch in batches.iter().skip(1) {
112 if min_table_key.as_slice() > *batch.start_table_key() {
113 min_table_key = batch.start_table_key().to_vec();
114 }
115 if max_table_key.as_slice() < *batch.end_table_key() {
116 max_table_key = batch.end_table_key().to_vec();
117 }
118 file_size += batch.size() as u64;
119 }
120 SstableInfoInner {
121 object_id: id.into(),
122 sst_id: id.into(),
123 key_range: KeyRange {
124 left: Bytes::from(FullKey::for_test(table_id, min_table_key, epoch).encode()),
125 right: Bytes::from(FullKey::for_test(table_id, max_table_key, epoch).encode()),
126 right_exclusive: false,
127 },
128 file_size,
129 table_ids: vec![table_id.table_id],
130 uncompressed_file_size: file_size,
131 min_epoch: epoch,
132 max_epoch: epoch,
133 sst_size: file_size,
134 ..Default::default()
135 }
136 .into()
137}
138
139pub const TEST_KEYS_COUNT: usize = 10000;
141
142pub fn default_builder_opt_for_test() -> SstableBuilderOptions {
143 SstableBuilderOptions {
144 capacity: 256 * (1 << 20), block_capacity: 4096, restart_interval: DEFAULT_RESTART_INTERVAL,
147 bloom_false_positive: 0.1,
148 ..Default::default()
149 }
150}
151
152pub fn default_writer_opt_for_test() -> SstableWriterOptions {
153 SstableWriterOptions {
154 capacity_hint: None,
155 tracker: None,
156 policy: CachePolicy::Disable,
157 }
158}
159
160pub fn mock_sst_writer(opt: &SstableBuilderOptions) -> InMemWriter {
161 InMemWriter::from(opt)
162}
163
164pub async fn gen_test_sstable_data(
166 opts: SstableBuilderOptions,
167 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
168) -> (Bytes, SstableMeta) {
169 let table_id_to_vnode = HashMap::from_iter(vec![(
170 TableId::default().table_id(),
171 VirtualNode::COUNT_FOR_TEST,
172 )]);
173 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
174 let mut b = SstableBuilder::for_test(
175 0,
176 mock_sst_writer(&opts),
177 opts,
178 table_id_to_vnode,
179 table_id_to_watermark_serde,
180 );
181 for (key, value) in kv_iter {
182 b.add_for_test(key.to_ref(), value.as_slice())
183 .await
184 .unwrap();
185 }
186 let output = b.finish().await.unwrap();
187 output.writer_output
188}
189
190pub async fn put_sst(
192 sst_object_id: u64,
193 data: Bytes,
194 mut meta: SstableMeta,
195 sstable_store: SstableStoreRef,
196 mut options: SstableWriterOptions,
197 table_ids: Vec<u32>,
198) -> HummockResult<SstableInfo> {
199 options.policy = CachePolicy::NotFill;
200 let mut writer = sstable_store
201 .clone()
202 .create_sst_writer(sst_object_id, options);
203 for block_meta in &meta.block_metas {
204 let offset = block_meta.offset as usize;
205 let end_offset = offset + block_meta.len as usize;
206 writer
207 .write_block(&data[offset..end_offset], block_meta)
208 .await?;
209 }
210
211 let bloom_filter = {
213 let mut filter_builder = BlockedXor16FilterBuilder::new(100);
214 for _ in &meta.block_metas {
215 filter_builder.switch_block(None);
216 }
217
218 filter_builder.finish(None)
219 };
220
221 meta.meta_offset = writer.data_len() as u64;
222 meta.bloom_filter = bloom_filter;
223 let sst = SstableInfoInner {
224 object_id: sst_object_id.into(),
225 sst_id: sst_object_id.into(),
226 key_range: KeyRange {
227 left: Bytes::from(meta.smallest_key.clone()),
228 right: Bytes::from(meta.largest_key.clone()),
229 right_exclusive: false,
230 },
231 file_size: meta.estimated_size as u64,
232 meta_offset: meta.meta_offset,
233 uncompressed_file_size: meta.estimated_size as u64,
234 table_ids,
235 ..Default::default()
236 }
237 .into();
238 let writer_output = writer.finish(meta).await?;
239 writer_output.await.unwrap()?;
240 Ok(sst)
241}
242
243pub async fn gen_test_sstable_impl<B: AsRef<[u8]> + Clone + Default + Eq, F: FilterBuilder>(
245 opts: SstableBuilderOptions,
246 object_id: u64,
247 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
248 sstable_store: SstableStoreRef,
249 policy: CachePolicy,
250 table_id_to_vnode: HashMap<u32, usize>,
251 table_id_to_watermark_serde: HashMap<u32, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
252) -> SstableInfo {
253 let writer_opts = SstableWriterOptions {
254 capacity_hint: None,
255 tracker: None,
256 policy,
257 };
258 let writer = sstable_store
259 .clone()
260 .create_sst_writer(object_id, writer_opts);
261
262 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
263 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
264 table_id_to_vnode,
265 table_id_to_watermark_serde,
266 ));
267
268 let mut b = SstableBuilder::<_, F>::new(
269 object_id,
270 writer,
271 F::create(opts.bloom_false_positive, opts.capacity / 16),
272 opts,
273 compaction_catalog_agent_ref,
274 None,
275 );
276
277 let mut last_key = FullKey::<B>::default();
278 for (key, value) in kv_iter {
279 let is_new_user_key =
280 last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref();
281 if is_new_user_key {
282 last_key = key.clone();
283 }
284
285 b.add(key.to_ref(), value.as_slice()).await.unwrap();
286 }
287 let output = b.finish().await.unwrap();
288 output.writer_output.await.unwrap().unwrap();
289 output.sst_info.sst_info
290}
291
292pub async fn gen_test_sstable<B: AsRef<[u8]> + Clone + Default + Eq>(
294 opts: SstableBuilderOptions,
295 object_id: u64,
296 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
297 sstable_store: SstableStoreRef,
298) -> (TableHolder, SstableInfo) {
299 let table_id_to_vnode = HashMap::from_iter(vec![(
300 TableId::default().table_id(),
301 VirtualNode::COUNT_FOR_TEST,
302 )]);
303
304 let table_id_to_watermark_serde =
305 HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
306
307 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
308 opts,
309 object_id,
310 kv_iter,
311 sstable_store.clone(),
312 CachePolicy::NotFill,
313 table_id_to_vnode,
314 table_id_to_watermark_serde,
315 )
316 .await;
317
318 (
319 sstable_store
320 .sstable(&sst_info, &mut StoreLocalStatistic::default())
321 .await
322 .unwrap(),
323 sst_info,
324 )
325}
326
327pub async fn gen_test_sstable_with_table_ids<B: AsRef<[u8]> + Clone + Default + Eq>(
328 opts: SstableBuilderOptions,
329 object_id: u64,
330 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
331 sstable_store: SstableStoreRef,
332 table_ids: Vec<StateTableId>,
333) -> (TableHolder, SstableInfo) {
334 let table_id_to_vnode = table_ids
335 .iter()
336 .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST))
337 .collect();
338 let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect();
339
340 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
341 opts,
342 object_id,
343 kv_iter,
344 sstable_store.clone(),
345 CachePolicy::NotFill,
346 table_id_to_vnode,
347 table_id_to_watermark_serde,
348 )
349 .await;
350
351 (
352 sstable_store
353 .sstable(&sst_info, &mut StoreLocalStatistic::default())
354 .await
355 .unwrap(),
356 sst_info,
357 )
358}
359
360pub async fn gen_test_sstable_info<B: AsRef<[u8]> + Clone + Default + Eq>(
362 opts: SstableBuilderOptions,
363 object_id: u64,
364 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
365 sstable_store: SstableStoreRef,
366) -> SstableInfo {
367 let table_id_to_vnode = HashMap::from_iter(vec![(
368 TableId::default().table_id(),
369 VirtualNode::COUNT_FOR_TEST,
370 )]);
371
372 let table_id_to_watermark_serde =
373 HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
374
375 gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
376 opts,
377 object_id,
378 kv_iter,
379 sstable_store,
380 CachePolicy::NotFill,
381 table_id_to_vnode,
382 table_id_to_watermark_serde,
383 )
384 .await
385}
386
387pub async fn gen_test_sstable_with_range_tombstone(
389 opts: SstableBuilderOptions,
390 object_id: u64,
391 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
392 sstable_store: SstableStoreRef,
393) -> SstableInfo {
394 let table_id_to_vnode = HashMap::from_iter(vec![(
395 TableId::default().table_id(),
396 VirtualNode::COUNT_FOR_TEST,
397 )]);
398
399 let table_id_to_watermark_serde =
400 HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
401
402 gen_test_sstable_impl::<_, Xor16FilterBuilder>(
403 opts,
404 object_id,
405 kv_iter,
406 sstable_store.clone(),
407 CachePolicy::Fill(Hint::Normal),
408 table_id_to_vnode,
409 table_id_to_watermark_serde,
410 )
411 .await
412}
413
414pub fn test_user_key(table_key: impl AsRef<[u8]>) -> UserKey<Vec<u8>> {
416 UserKey::for_test(TableId::default(), table_key.as_ref().to_vec())
417}
418
419pub fn test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
421 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
422 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
423 UserKey::for_test(TableId::default(), table_key)
424}
425
426pub fn test_key_of(idx: usize) -> FullKey<Vec<u8>> {
428 FullKey {
429 user_key: test_user_key_of(idx),
430 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
431 }
432}
433
434pub fn test_value_of(idx: usize) -> Vec<u8> {
436 "23332333"
437 .as_bytes()
438 .iter()
439 .cycle()
440 .cloned()
441 .take(idx % 100 + 1) .collect_vec()
443}
444
445pub async fn gen_default_test_sstable(
449 opts: SstableBuilderOptions,
450 object_id: u64,
451 sstable_store: SstableStoreRef,
452) -> (TableHolder, SstableInfo) {
453 gen_test_sstable(
454 opts,
455 object_id,
456 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
457 sstable_store,
458 )
459 .await
460}
461
462pub async fn count_stream(mut i: impl StateStoreIter) -> usize {
463 let mut c: usize = 0;
464 while i.try_next().await.unwrap().is_some() {
465 c += 1
466 }
467 c
468}
469
470pub fn create_small_table_cache() -> Arc<LruCache<HummockSstableObjectId, Box<Sstable>>> {
471 Arc::new(LruCache::new(1, 4, 0))
472}
473
474pub async fn hybrid_cache_for_test<K, V>() -> HybridCache<K, V>
475where
476 K: HybridKey,
477 V: HybridValue,
478{
479 HybridCacheBuilder::new()
480 .memory(10)
481 .storage(Engine::Large(LargeEngineOptions::new()))
482 .build()
483 .await
484 .unwrap()
485}
486
487#[derive(Default, Clone)]
488pub struct StateStoreTestReadOptions {
489 pub table_id: TableId,
490 pub prefix_hint: Option<Bytes>,
491 pub prefetch_options: PrefetchOptions,
492 pub cache_policy: CachePolicy,
493 pub read_committed: bool,
494 pub retention_seconds: Option<u32>,
495 pub read_version_from_backup: bool,
496}
497
498impl StateStoreTestReadOptions {
499 fn get_read_epoch(&self, epoch: u64) -> HummockReadEpoch {
500 if self.read_version_from_backup {
501 HummockReadEpoch::Backup(epoch)
502 } else if self.read_committed {
503 HummockReadEpoch::Committed(epoch)
504 } else {
505 HummockReadEpoch::NoWait(epoch)
506 }
507 }
508}
509
510pub type ReadOptions = StateStoreTestReadOptions;
511
512impl From<StateStoreTestReadOptions> for crate::store::ReadOptions {
513 fn from(val: StateStoreTestReadOptions) -> crate::store::ReadOptions {
514 crate::store::ReadOptions {
515 prefix_hint: val.prefix_hint,
516 prefetch_options: val.prefetch_options,
517 cache_policy: val.cache_policy,
518 retention_seconds: val.retention_seconds,
519 }
520 }
521}
522
523pub trait StateStoreReadTestExt: StateStore {
524 fn get_keyed_row(
528 &self,
529 key: TableKey<Bytes>,
530 epoch: u64,
531 read_options: ReadOptions,
532 ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
533
534 fn get(
538 &self,
539 key: TableKey<Bytes>,
540 epoch: u64,
541 read_options: ReadOptions,
542 ) -> impl StorageFuture<'_, Option<Bytes>> {
543 self.get_keyed_row(key, epoch, read_options)
544 .map_ok(|v| v.map(|(_, v)| v))
545 }
546
547 fn iter(
553 &self,
554 key_range: TableKeyRange,
555 epoch: u64,
556 read_options: ReadOptions,
557 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter>;
558
559 fn rev_iter(
560 &self,
561 key_range: TableKeyRange,
562 epoch: u64,
563 read_options: ReadOptions,
564 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter>;
565
566 fn scan(
567 &self,
568 key_range: TableKeyRange,
569 epoch: u64,
570 limit: Option<usize>,
571 read_options: ReadOptions,
572 ) -> impl StorageFuture<'_, Vec<StateStoreKeyedRow>>;
573}
574
575impl<S: StateStore> StateStoreReadTestExt for S {
576 async fn get_keyed_row(
577 &self,
578 key: TableKey<Bytes>,
579 epoch: u64,
580 read_options: ReadOptions,
581 ) -> StorageResult<Option<StateStoreKeyedRow>> {
582 let snapshot = self
583 .new_read_snapshot(
584 read_options.get_read_epoch(epoch),
585 NewReadSnapshotOptions {
586 table_id: read_options.table_id,
587 },
588 )
589 .await?;
590 snapshot
591 .on_key_value(key, read_options.into(), |key, value| {
592 Ok((key.copy_into(), Bytes::copy_from_slice(value)))
593 })
594 .await
595 }
596
597 async fn iter(
598 &self,
599 key_range: TableKeyRange,
600 epoch: u64,
601 read_options: ReadOptions,
602 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter> {
603 let snapshot = self
604 .new_read_snapshot(
605 read_options.get_read_epoch(epoch),
606 NewReadSnapshotOptions {
607 table_id: read_options.table_id,
608 },
609 )
610 .await?;
611 snapshot.iter(key_range, read_options.into()).await
612 }
613
614 async fn rev_iter(
615 &self,
616 key_range: TableKeyRange,
617 epoch: u64,
618 read_options: ReadOptions,
619 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter> {
620 let snapshot = self
621 .new_read_snapshot(
622 read_options.get_read_epoch(epoch),
623 NewReadSnapshotOptions {
624 table_id: read_options.table_id,
625 },
626 )
627 .await?;
628 snapshot.rev_iter(key_range, read_options.into()).await
629 }
630
631 async fn scan(
632 &self,
633 key_range: TableKeyRange,
634 epoch: u64,
635 limit: Option<usize>,
636 read_options: ReadOptions,
637 ) -> StorageResult<Vec<StateStoreKeyedRow>> {
638 const MAX_INITIAL_CAP: usize = 1024;
639 let limit = limit.unwrap_or(usize::MAX);
640 let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
641 let mut iter = self.iter(key_range, epoch, read_options).await?;
642 while let Some((key, value)) = iter.try_next().await? {
643 ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
644 }
645 Ok(ret)
646 }
647}
648
649pub trait StateStoreGetTestExt: StateStoreGet {
650 fn get(
651 &self,
652 key: TableKey<Bytes>,
653 read_options: ReadOptions,
654 ) -> impl StorageFuture<'_, Option<Bytes>>;
655}
656
657impl<S: StateStoreGet> StateStoreGetTestExt for S {
658 async fn get(
659 &self,
660 key: TableKey<Bytes>,
661 read_options: ReadOptions,
662 ) -> StorageResult<Option<Bytes>> {
663 self.on_key_value(key, read_options.into(), |_, value| {
664 Ok(Bytes::copy_from_slice(value))
665 })
666 .await
667 }
668}