1use std::cmp::min;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use foyer::{
21 CacheHint, Engine, HybridCache, HybridCacheBuilder, StorageKey as HybridKey,
22 StorageValue as HybridValue,
23};
24use futures::TryFutureExt;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_common::config::EvictionConfig;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::test_epoch;
30use risingwave_common::util::row_serde::OrderedRowSerde;
31use risingwave_hummock_sdk::compaction_group::StateTableId;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
33use risingwave_hummock_sdk::key_range::KeyRange;
34use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
35use risingwave_hummock_sdk::{
36 EpochWithGap, HummockEpoch, HummockReadEpoch, HummockSstableObjectId,
37};
38
39use super::iterator::test_utils::iterator_test_table_key_of;
40use super::{
41 DEFAULT_RESTART_INTERVAL, HummockResult, InMemWriter, SstableMeta, SstableWriterOptions,
42};
43use crate::StateStore;
44use crate::compaction_catalog_manager::{
45 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
46};
47use crate::error::StorageResult;
48use crate::hummock::shared_buffer::shared_buffer_batch::{
49 SharedBufferBatch, SharedBufferItem, SharedBufferValue,
50};
51use crate::hummock::value::HummockValue;
52use crate::hummock::{
53 BlockedXor16FilterBuilder, CachePolicy, FilterBuilder, LruCache, Sstable, SstableBuilder,
54 SstableBuilderOptions, SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder,
55};
56use crate::monitor::StoreLocalStatistic;
57use crate::opts::StorageOpts;
58use crate::storage_value::StorageValue;
59use crate::store::*;
60
61pub fn default_opts_for_test() -> StorageOpts {
62 StorageOpts {
63 sstable_size_mb: 4,
64 block_size_kb: 64,
65 bloom_false_positive: 0.1,
66 share_buffers_sync_parallelism: 2,
67 share_buffer_compaction_worker_threads_number: 1,
68 shared_buffer_capacity_mb: 64,
69 data_directory: "hummock_001".to_owned(),
70 write_conflict_detection_enabled: true,
71 block_cache_capacity_mb: 64,
72 meta_cache_capacity_mb: 64,
73 block_cache_eviction_config: EvictionConfig::for_test(),
74 disable_remote_compactor: false,
75 share_buffer_upload_concurrency: 1,
76 compactor_memory_limit_mb: 64,
77 sstable_id_remote_fetch_number: 1,
78 ..Default::default()
79 }
80}
81
82pub fn gen_dummy_batch(n: u64) -> Vec<SharedBufferItem> {
83 vec![(
84 TableKey(Bytes::from(iterator_test_table_key_of(n as usize))),
85 SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])),
86 )]
87}
88
89pub fn gen_dummy_batch_several_keys(n: usize) -> Vec<(TableKey<Bytes>, StorageValue)> {
90 let mut kvs = vec![];
91 let v = Bytes::from(b"value1".to_vec().repeat(100));
92 for idx in 0..n {
93 kvs.push((
94 TableKey(Bytes::from(iterator_test_table_key_of(idx))),
95 StorageValue::new_put(v.clone()),
96 ));
97 }
98 kvs
99}
100
101pub fn gen_dummy_sst_info(
102 id: HummockSstableObjectId,
103 batches: Vec<SharedBufferBatch>,
104 table_id: TableId,
105 epoch: HummockEpoch,
106) -> SstableInfo {
107 let mut min_table_key: Vec<u8> = batches[0].start_table_key().to_vec();
108 let mut max_table_key: Vec<u8> = batches[0].end_table_key().to_vec();
109 let mut file_size = 0;
110 for batch in batches.iter().skip(1) {
111 if min_table_key.as_slice() > *batch.start_table_key() {
112 min_table_key = batch.start_table_key().to_vec();
113 }
114 if max_table_key.as_slice() < *batch.end_table_key() {
115 max_table_key = batch.end_table_key().to_vec();
116 }
117 file_size += batch.size() as u64;
118 }
119 SstableInfoInner {
120 object_id: id,
121 sst_id: id,
122 key_range: KeyRange {
123 left: Bytes::from(FullKey::for_test(table_id, min_table_key, epoch).encode()),
124 right: Bytes::from(FullKey::for_test(table_id, max_table_key, epoch).encode()),
125 right_exclusive: false,
126 },
127 file_size,
128 table_ids: vec![table_id.table_id],
129 uncompressed_file_size: file_size,
130 min_epoch: epoch,
131 max_epoch: epoch,
132 sst_size: file_size,
133 ..Default::default()
134 }
135 .into()
136}
137
138pub const TEST_KEYS_COUNT: usize = 10000;
140
141pub fn default_builder_opt_for_test() -> SstableBuilderOptions {
142 SstableBuilderOptions {
143 capacity: 256 * (1 << 20), block_capacity: 4096, restart_interval: DEFAULT_RESTART_INTERVAL,
146 bloom_false_positive: 0.1,
147 ..Default::default()
148 }
149}
150
151pub fn default_writer_opt_for_test() -> SstableWriterOptions {
152 SstableWriterOptions {
153 capacity_hint: None,
154 tracker: None,
155 policy: CachePolicy::Disable,
156 }
157}
158
159pub fn mock_sst_writer(opt: &SstableBuilderOptions) -> InMemWriter {
160 InMemWriter::from(opt)
161}
162
163pub async fn gen_test_sstable_data(
165 opts: SstableBuilderOptions,
166 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
167) -> (Bytes, SstableMeta) {
168 let table_id_to_vnode = HashMap::from_iter(vec![(
169 TableId::default().table_id(),
170 VirtualNode::COUNT_FOR_TEST,
171 )]);
172 let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
173 let mut b = SstableBuilder::for_test(
174 0,
175 mock_sst_writer(&opts),
176 opts,
177 table_id_to_vnode,
178 table_id_to_watermark_serde,
179 );
180 for (key, value) in kv_iter {
181 b.add_for_test(key.to_ref(), value.as_slice())
182 .await
183 .unwrap();
184 }
185 let output = b.finish().await.unwrap();
186 output.writer_output
187}
188
189pub async fn put_sst(
191 sst_object_id: HummockSstableObjectId,
192 data: Bytes,
193 mut meta: SstableMeta,
194 sstable_store: SstableStoreRef,
195 mut options: SstableWriterOptions,
196 table_ids: Vec<u32>,
197) -> HummockResult<SstableInfo> {
198 options.policy = CachePolicy::NotFill;
199 let mut writer = sstable_store
200 .clone()
201 .create_sst_writer(sst_object_id, options);
202 for block_meta in &meta.block_metas {
203 let offset = block_meta.offset as usize;
204 let end_offset = offset + block_meta.len as usize;
205 writer
206 .write_block(&data[offset..end_offset], block_meta)
207 .await?;
208 }
209
210 let bloom_filter = {
212 let mut filter_builder = BlockedXor16FilterBuilder::new(100);
213 for _ in &meta.block_metas {
214 filter_builder.switch_block(None);
215 }
216
217 filter_builder.finish(None)
218 };
219
220 meta.meta_offset = writer.data_len() as u64;
221 meta.bloom_filter = bloom_filter;
222 let sst = SstableInfoInner {
223 object_id: sst_object_id,
224 sst_id: sst_object_id,
225 key_range: KeyRange {
226 left: Bytes::from(meta.smallest_key.clone()),
227 right: Bytes::from(meta.largest_key.clone()),
228 right_exclusive: false,
229 },
230 file_size: meta.estimated_size as u64,
231 meta_offset: meta.meta_offset,
232 uncompressed_file_size: meta.estimated_size as u64,
233 table_ids,
234 ..Default::default()
235 }
236 .into();
237 let writer_output = writer.finish(meta).await?;
238 writer_output.await.unwrap()?;
239 Ok(sst)
240}
241
242pub async fn gen_test_sstable_impl<B: AsRef<[u8]> + Clone + Default + Eq, F: FilterBuilder>(
244 opts: SstableBuilderOptions,
245 object_id: HummockSstableObjectId,
246 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
247 sstable_store: SstableStoreRef,
248 policy: CachePolicy,
249 table_id_to_vnode: HashMap<u32, usize>,
250 table_id_to_watermark_serde: HashMap<u32, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
251) -> SstableInfo {
252 let writer_opts = SstableWriterOptions {
253 capacity_hint: None,
254 tracker: None,
255 policy,
256 };
257 let writer = sstable_store
258 .clone()
259 .create_sst_writer(object_id, writer_opts);
260
261 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
262 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
263 table_id_to_vnode,
264 table_id_to_watermark_serde,
265 ));
266
267 let mut b = SstableBuilder::<_, F>::new(
268 object_id,
269 writer,
270 F::create(opts.bloom_false_positive, opts.capacity / 16),
271 opts,
272 compaction_catalog_agent_ref,
273 None,
274 );
275
276 let mut last_key = FullKey::<B>::default();
277 for (key, value) in kv_iter {
278 let is_new_user_key =
279 last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref();
280 if is_new_user_key {
281 last_key = key.clone();
282 }
283
284 b.add(key.to_ref(), value.as_slice()).await.unwrap();
285 }
286 let output = b.finish().await.unwrap();
287 output.writer_output.await.unwrap().unwrap();
288 output.sst_info.sst_info
289}
290
291pub async fn gen_test_sstable<B: AsRef<[u8]> + Clone + Default + Eq>(
293 opts: SstableBuilderOptions,
294 object_id: HummockSstableObjectId,
295 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
296 sstable_store: SstableStoreRef,
297) -> (TableHolder, SstableInfo) {
298 let table_id_to_vnode = HashMap::from_iter(vec![(
299 TableId::default().table_id(),
300 VirtualNode::COUNT_FOR_TEST,
301 )]);
302
303 let table_id_to_watermark_serde =
304 HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
305
306 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
307 opts,
308 object_id,
309 kv_iter,
310 sstable_store.clone(),
311 CachePolicy::NotFill,
312 table_id_to_vnode,
313 table_id_to_watermark_serde,
314 )
315 .await;
316
317 (
318 sstable_store
319 .sstable(&sst_info, &mut StoreLocalStatistic::default())
320 .await
321 .unwrap(),
322 sst_info,
323 )
324}
325
326pub async fn gen_test_sstable_with_table_ids<B: AsRef<[u8]> + Clone + Default + Eq>(
327 opts: SstableBuilderOptions,
328 object_id: HummockSstableObjectId,
329 kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
330 sstable_store: SstableStoreRef,
331 table_ids: Vec<StateTableId>,
332) -> (TableHolder, SstableInfo) {
333 let table_id_to_vnode = table_ids
334 .iter()
335 .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST))
336 .collect();
337 let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect();
338
339 let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
340 opts,
341 object_id,
342 kv_iter,
343 sstable_store.clone(),
344 CachePolicy::NotFill,
345 table_id_to_vnode,
346 table_id_to_watermark_serde,
347 )
348 .await;
349
350 (
351 sstable_store
352 .sstable(&sst_info, &mut StoreLocalStatistic::default())
353 .await
354 .unwrap(),
355 sst_info,
356 )
357}
358
359pub async fn gen_test_sstable_info<B: AsRef<[u8]> + Clone + Default + Eq>(
361 opts: SstableBuilderOptions,
362 object_id: HummockSstableObjectId,
363 kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
364 sstable_store: SstableStoreRef,
365) -> SstableInfo {
366 let table_id_to_vnode = HashMap::from_iter(vec![(
367 TableId::default().table_id(),
368 VirtualNode::COUNT_FOR_TEST,
369 )]);
370
371 let table_id_to_watermark_serde =
372 HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
373
374 gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
375 opts,
376 object_id,
377 kv_iter,
378 sstable_store,
379 CachePolicy::NotFill,
380 table_id_to_vnode,
381 table_id_to_watermark_serde,
382 )
383 .await
384}
385
386pub async fn gen_test_sstable_with_range_tombstone(
388 opts: SstableBuilderOptions,
389 object_id: HummockSstableObjectId,
390 kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
391 sstable_store: SstableStoreRef,
392) -> SstableInfo {
393 let table_id_to_vnode = HashMap::from_iter(vec![(
394 TableId::default().table_id(),
395 VirtualNode::COUNT_FOR_TEST,
396 )]);
397
398 let table_id_to_watermark_serde =
399 HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
400
401 gen_test_sstable_impl::<_, Xor16FilterBuilder>(
402 opts,
403 object_id,
404 kv_iter,
405 sstable_store.clone(),
406 CachePolicy::Fill(CacheHint::Normal),
407 table_id_to_vnode,
408 table_id_to_watermark_serde,
409 )
410 .await
411}
412
413pub fn test_user_key(table_key: impl AsRef<[u8]>) -> UserKey<Vec<u8>> {
415 UserKey::for_test(TableId::default(), table_key.as_ref().to_vec())
416}
417
418pub fn test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
420 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
421 table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
422 UserKey::for_test(TableId::default(), table_key)
423}
424
425pub fn test_key_of(idx: usize) -> FullKey<Vec<u8>> {
427 FullKey {
428 user_key: test_user_key_of(idx),
429 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
430 }
431}
432
433pub fn test_value_of(idx: usize) -> Vec<u8> {
435 "23332333"
436 .as_bytes()
437 .iter()
438 .cycle()
439 .cloned()
440 .take(idx % 100 + 1) .collect_vec()
442}
443
444pub async fn gen_default_test_sstable(
448 opts: SstableBuilderOptions,
449 object_id: HummockSstableObjectId,
450 sstable_store: SstableStoreRef,
451) -> (TableHolder, SstableInfo) {
452 gen_test_sstable(
453 opts,
454 object_id,
455 (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
456 sstable_store,
457 )
458 .await
459}
460
461pub async fn count_stream(mut i: impl StateStoreIter) -> usize {
462 let mut c: usize = 0;
463 while i.try_next().await.unwrap().is_some() {
464 c += 1
465 }
466 c
467}
468
469pub fn create_small_table_cache() -> Arc<LruCache<HummockSstableObjectId, Box<Sstable>>> {
470 Arc::new(LruCache::new(1, 4, 0))
471}
472
473pub async fn hybrid_cache_for_test<K, V>() -> HybridCache<K, V>
474where
475 K: HybridKey,
476 V: HybridValue,
477{
478 HybridCacheBuilder::new()
479 .memory(10)
480 .storage(Engine::Large)
481 .build()
482 .await
483 .unwrap()
484}
485
486pub trait StateStoreReadTestExt: StateStore {
487 fn get_keyed_row(
491 &self,
492 key: TableKey<Bytes>,
493 epoch: u64,
494 read_options: ReadOptions,
495 ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
496
497 fn get(
501 &self,
502 key: TableKey<Bytes>,
503 epoch: u64,
504 read_options: ReadOptions,
505 ) -> impl StorageFuture<'_, Option<Bytes>> {
506 self.get_keyed_row(key, epoch, read_options)
507 .map_ok(|v| v.map(|(_, v)| v))
508 }
509
510 fn iter(
516 &self,
517 key_range: TableKeyRange,
518 epoch: u64,
519 read_options: ReadOptions,
520 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter>;
521
522 fn rev_iter(
523 &self,
524 key_range: TableKeyRange,
525 epoch: u64,
526 read_options: ReadOptions,
527 ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter>;
528
529 fn scan(
530 &self,
531 key_range: TableKeyRange,
532 epoch: u64,
533 limit: Option<usize>,
534 read_options: ReadOptions,
535 ) -> impl StorageFuture<'_, Vec<StateStoreKeyedRow>>;
536}
537
538impl<S: StateStore> StateStoreReadTestExt for S {
539 async fn get_keyed_row(
540 &self,
541 key: TableKey<Bytes>,
542 epoch: u64,
543 read_options: ReadOptions,
544 ) -> StorageResult<Option<StateStoreKeyedRow>> {
545 let snapshot = self
546 .new_read_snapshot(
547 HummockReadEpoch::NoWait(epoch),
548 NewReadSnapshotOptions {
549 table_id: read_options.table_id,
550 },
551 )
552 .await?;
553 snapshot.get_keyed_row(key, read_options).await
554 }
555
556 async fn iter(
557 &self,
558 key_range: TableKeyRange,
559 epoch: u64,
560 read_options: ReadOptions,
561 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter> {
562 let snapshot = self
563 .new_read_snapshot(
564 HummockReadEpoch::NoWait(epoch),
565 NewReadSnapshotOptions {
566 table_id: read_options.table_id,
567 },
568 )
569 .await?;
570 snapshot.iter(key_range, read_options).await
571 }
572
573 async fn rev_iter(
574 &self,
575 key_range: TableKeyRange,
576 epoch: u64,
577 read_options: ReadOptions,
578 ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter> {
579 let snapshot = self
580 .new_read_snapshot(
581 HummockReadEpoch::NoWait(epoch),
582 NewReadSnapshotOptions {
583 table_id: read_options.table_id,
584 },
585 )
586 .await?;
587 snapshot.rev_iter(key_range, read_options).await
588 }
589
590 async fn scan(
591 &self,
592 key_range: TableKeyRange,
593 epoch: u64,
594 limit: Option<usize>,
595 mut read_options: ReadOptions,
596 ) -> StorageResult<Vec<StateStoreKeyedRow>> {
597 if limit.is_some() {
598 read_options.prefetch_options.prefetch = false;
599 }
600 const MAX_INITIAL_CAP: usize = 1024;
601 let limit = limit.unwrap_or(usize::MAX);
602 let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
603 let mut iter = self.iter(key_range, epoch, read_options).await?;
604 while let Some((key, value)) = iter.try_next().await? {
605 ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
606 }
607 Ok(ret)
608 }
609}