risingwave_storage/hummock/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use foyer::{
21    CacheHint, Engine, HybridCache, HybridCacheBuilder, StorageKey as HybridKey,
22    StorageValue as HybridValue,
23};
24use futures::TryFutureExt;
25use itertools::Itertools;
26use risingwave_common::catalog::TableId;
27use risingwave_common::config::EvictionConfig;
28use risingwave_common::hash::VirtualNode;
29use risingwave_common::util::epoch::test_epoch;
30use risingwave_common::util::row_serde::OrderedRowSerde;
31use risingwave_hummock_sdk::compaction_group::StateTableId;
32use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey};
33use risingwave_hummock_sdk::key_range::KeyRange;
34use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
35use risingwave_hummock_sdk::{
36    EpochWithGap, HummockEpoch, HummockReadEpoch, HummockSstableObjectId,
37};
38
39use super::iterator::test_utils::iterator_test_table_key_of;
40use super::{
41    DEFAULT_RESTART_INTERVAL, HummockResult, InMemWriter, SstableMeta, SstableWriterOptions,
42};
43use crate::StateStore;
44use crate::compaction_catalog_manager::{
45    CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
46};
47use crate::error::StorageResult;
48use crate::hummock::shared_buffer::shared_buffer_batch::{
49    SharedBufferBatch, SharedBufferItem, SharedBufferValue,
50};
51use crate::hummock::value::HummockValue;
52use crate::hummock::{
53    BlockedXor16FilterBuilder, CachePolicy, FilterBuilder, LruCache, Sstable, SstableBuilder,
54    SstableBuilderOptions, SstableStoreRef, SstableWriter, TableHolder, Xor16FilterBuilder,
55};
56use crate::monitor::StoreLocalStatistic;
57use crate::opts::StorageOpts;
58use crate::storage_value::StorageValue;
59use crate::store::*;
60
61pub fn default_opts_for_test() -> StorageOpts {
62    StorageOpts {
63        sstable_size_mb: 4,
64        block_size_kb: 64,
65        bloom_false_positive: 0.1,
66        share_buffers_sync_parallelism: 2,
67        share_buffer_compaction_worker_threads_number: 1,
68        shared_buffer_capacity_mb: 64,
69        data_directory: "hummock_001".to_owned(),
70        write_conflict_detection_enabled: true,
71        block_cache_capacity_mb: 64,
72        meta_cache_capacity_mb: 64,
73        block_cache_eviction_config: EvictionConfig::for_test(),
74        disable_remote_compactor: false,
75        share_buffer_upload_concurrency: 1,
76        compactor_memory_limit_mb: 64,
77        sstable_id_remote_fetch_number: 1,
78        ..Default::default()
79    }
80}
81
82pub fn gen_dummy_batch(n: u64) -> Vec<SharedBufferItem> {
83    vec![(
84        TableKey(Bytes::from(iterator_test_table_key_of(n as usize))),
85        SharedBufferValue::Insert(Bytes::copy_from_slice(&b"value1"[..])),
86    )]
87}
88
89pub fn gen_dummy_batch_several_keys(n: usize) -> Vec<(TableKey<Bytes>, StorageValue)> {
90    let mut kvs = vec![];
91    let v = Bytes::from(b"value1".to_vec().repeat(100));
92    for idx in 0..n {
93        kvs.push((
94            TableKey(Bytes::from(iterator_test_table_key_of(idx))),
95            StorageValue::new_put(v.clone()),
96        ));
97    }
98    kvs
99}
100
101pub fn gen_dummy_sst_info(
102    id: HummockSstableObjectId,
103    batches: Vec<SharedBufferBatch>,
104    table_id: TableId,
105    epoch: HummockEpoch,
106) -> SstableInfo {
107    let mut min_table_key: Vec<u8> = batches[0].start_table_key().to_vec();
108    let mut max_table_key: Vec<u8> = batches[0].end_table_key().to_vec();
109    let mut file_size = 0;
110    for batch in batches.iter().skip(1) {
111        if min_table_key.as_slice() > *batch.start_table_key() {
112            min_table_key = batch.start_table_key().to_vec();
113        }
114        if max_table_key.as_slice() < *batch.end_table_key() {
115            max_table_key = batch.end_table_key().to_vec();
116        }
117        file_size += batch.size() as u64;
118    }
119    SstableInfoInner {
120        object_id: id,
121        sst_id: id,
122        key_range: KeyRange {
123            left: Bytes::from(FullKey::for_test(table_id, min_table_key, epoch).encode()),
124            right: Bytes::from(FullKey::for_test(table_id, max_table_key, epoch).encode()),
125            right_exclusive: false,
126        },
127        file_size,
128        table_ids: vec![table_id.table_id],
129        uncompressed_file_size: file_size,
130        min_epoch: epoch,
131        max_epoch: epoch,
132        sst_size: file_size,
133        ..Default::default()
134    }
135    .into()
136}
137
138/// Number of keys in table generated in `generate_table`.
139pub const TEST_KEYS_COUNT: usize = 10000;
140
141pub fn default_builder_opt_for_test() -> SstableBuilderOptions {
142    SstableBuilderOptions {
143        capacity: 256 * (1 << 20), // 256MB
144        block_capacity: 4096,      // 4KB
145        restart_interval: DEFAULT_RESTART_INTERVAL,
146        bloom_false_positive: 0.1,
147        ..Default::default()
148    }
149}
150
151pub fn default_writer_opt_for_test() -> SstableWriterOptions {
152    SstableWriterOptions {
153        capacity_hint: None,
154        tracker: None,
155        policy: CachePolicy::Disable,
156    }
157}
158
159pub fn mock_sst_writer(opt: &SstableBuilderOptions) -> InMemWriter {
160    InMemWriter::from(opt)
161}
162
163/// Generates sstable data and metadata from given `kv_iter`
164pub async fn gen_test_sstable_data(
165    opts: SstableBuilderOptions,
166    kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
167) -> (Bytes, SstableMeta) {
168    let table_id_to_vnode = HashMap::from_iter(vec![(
169        TableId::default().table_id(),
170        VirtualNode::COUNT_FOR_TEST,
171    )]);
172    let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
173    let mut b = SstableBuilder::for_test(
174        0,
175        mock_sst_writer(&opts),
176        opts,
177        table_id_to_vnode,
178        table_id_to_watermark_serde,
179    );
180    for (key, value) in kv_iter {
181        b.add_for_test(key.to_ref(), value.as_slice())
182            .await
183            .unwrap();
184    }
185    let output = b.finish().await.unwrap();
186    output.writer_output
187}
188
189/// Write the data and meta to `sstable_store`.
190pub async fn put_sst(
191    sst_object_id: HummockSstableObjectId,
192    data: Bytes,
193    mut meta: SstableMeta,
194    sstable_store: SstableStoreRef,
195    mut options: SstableWriterOptions,
196    table_ids: Vec<u32>,
197) -> HummockResult<SstableInfo> {
198    options.policy = CachePolicy::NotFill;
199    let mut writer = sstable_store
200        .clone()
201        .create_sst_writer(sst_object_id, options);
202    for block_meta in &meta.block_metas {
203        let offset = block_meta.offset as usize;
204        let end_offset = offset + block_meta.len as usize;
205        writer
206            .write_block(&data[offset..end_offset], block_meta)
207            .await?;
208    }
209
210    // dummy
211    let bloom_filter = {
212        let mut filter_builder = BlockedXor16FilterBuilder::new(100);
213        for _ in &meta.block_metas {
214            filter_builder.switch_block(None);
215        }
216
217        filter_builder.finish(None)
218    };
219
220    meta.meta_offset = writer.data_len() as u64;
221    meta.bloom_filter = bloom_filter;
222    let sst = SstableInfoInner {
223        object_id: sst_object_id,
224        sst_id: sst_object_id,
225        key_range: KeyRange {
226            left: Bytes::from(meta.smallest_key.clone()),
227            right: Bytes::from(meta.largest_key.clone()),
228            right_exclusive: false,
229        },
230        file_size: meta.estimated_size as u64,
231        meta_offset: meta.meta_offset,
232        uncompressed_file_size: meta.estimated_size as u64,
233        table_ids,
234        ..Default::default()
235    }
236    .into();
237    let writer_output = writer.finish(meta).await?;
238    writer_output.await.unwrap()?;
239    Ok(sst)
240}
241
242/// Generates a test table from the given `kv_iter` and put the kv value to `sstable_store`
243pub async fn gen_test_sstable_impl<B: AsRef<[u8]> + Clone + Default + Eq, F: FilterBuilder>(
244    opts: SstableBuilderOptions,
245    object_id: HummockSstableObjectId,
246    kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
247    sstable_store: SstableStoreRef,
248    policy: CachePolicy,
249    table_id_to_vnode: HashMap<u32, usize>,
250    table_id_to_watermark_serde: HashMap<u32, Option<(OrderedRowSerde, OrderedRowSerde, usize)>>,
251) -> SstableInfo {
252    let writer_opts = SstableWriterOptions {
253        capacity_hint: None,
254        tracker: None,
255        policy,
256    };
257    let writer = sstable_store
258        .clone()
259        .create_sst_writer(object_id, writer_opts);
260
261    let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
262        FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
263        table_id_to_vnode,
264        table_id_to_watermark_serde,
265    ));
266
267    let mut b = SstableBuilder::<_, F>::new(
268        object_id,
269        writer,
270        F::create(opts.bloom_false_positive, opts.capacity / 16),
271        opts,
272        compaction_catalog_agent_ref,
273        None,
274    );
275
276    let mut last_key = FullKey::<B>::default();
277    for (key, value) in kv_iter {
278        let is_new_user_key =
279            last_key.is_empty() || key.user_key.as_ref() != last_key.user_key.as_ref();
280        if is_new_user_key {
281            last_key = key.clone();
282        }
283
284        b.add(key.to_ref(), value.as_slice()).await.unwrap();
285    }
286    let output = b.finish().await.unwrap();
287    output.writer_output.await.unwrap().unwrap();
288    output.sst_info.sst_info
289}
290
291/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
292pub async fn gen_test_sstable<B: AsRef<[u8]> + Clone + Default + Eq>(
293    opts: SstableBuilderOptions,
294    object_id: HummockSstableObjectId,
295    kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
296    sstable_store: SstableStoreRef,
297) -> (TableHolder, SstableInfo) {
298    let table_id_to_vnode = HashMap::from_iter(vec![(
299        TableId::default().table_id(),
300        VirtualNode::COUNT_FOR_TEST,
301    )]);
302
303    let table_id_to_watermark_serde =
304        HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
305
306    let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
307        opts,
308        object_id,
309        kv_iter,
310        sstable_store.clone(),
311        CachePolicy::NotFill,
312        table_id_to_vnode,
313        table_id_to_watermark_serde,
314    )
315    .await;
316
317    (
318        sstable_store
319            .sstable(&sst_info, &mut StoreLocalStatistic::default())
320            .await
321            .unwrap(),
322        sst_info,
323    )
324}
325
326pub async fn gen_test_sstable_with_table_ids<B: AsRef<[u8]> + Clone + Default + Eq>(
327    opts: SstableBuilderOptions,
328    object_id: HummockSstableObjectId,
329    kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
330    sstable_store: SstableStoreRef,
331    table_ids: Vec<StateTableId>,
332) -> (TableHolder, SstableInfo) {
333    let table_id_to_vnode = table_ids
334        .iter()
335        .map(|table_id| (*table_id, VirtualNode::COUNT_FOR_TEST))
336        .collect();
337    let table_id_to_watermark_serde = table_ids.iter().map(|table_id| (*table_id, None)).collect();
338
339    let sst_info = gen_test_sstable_impl::<_, Xor16FilterBuilder>(
340        opts,
341        object_id,
342        kv_iter,
343        sstable_store.clone(),
344        CachePolicy::NotFill,
345        table_id_to_vnode,
346        table_id_to_watermark_serde,
347    )
348    .await;
349
350    (
351        sstable_store
352            .sstable(&sst_info, &mut StoreLocalStatistic::default())
353            .await
354            .unwrap(),
355        sst_info,
356    )
357}
358
359/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
360pub async fn gen_test_sstable_info<B: AsRef<[u8]> + Clone + Default + Eq>(
361    opts: SstableBuilderOptions,
362    object_id: HummockSstableObjectId,
363    kv_iter: impl IntoIterator<Item = (FullKey<B>, HummockValue<B>)>,
364    sstable_store: SstableStoreRef,
365) -> SstableInfo {
366    let table_id_to_vnode = HashMap::from_iter(vec![(
367        TableId::default().table_id(),
368        VirtualNode::COUNT_FOR_TEST,
369    )]);
370
371    let table_id_to_watermark_serde =
372        HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
373
374    gen_test_sstable_impl::<_, BlockedXor16FilterBuilder>(
375        opts,
376        object_id,
377        kv_iter,
378        sstable_store,
379        CachePolicy::NotFill,
380        table_id_to_vnode,
381        table_id_to_watermark_serde,
382    )
383    .await
384}
385
386/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
387pub async fn gen_test_sstable_with_range_tombstone(
388    opts: SstableBuilderOptions,
389    object_id: HummockSstableObjectId,
390    kv_iter: impl Iterator<Item = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>)>,
391    sstable_store: SstableStoreRef,
392) -> SstableInfo {
393    let table_id_to_vnode = HashMap::from_iter(vec![(
394        TableId::default().table_id(),
395        VirtualNode::COUNT_FOR_TEST,
396    )]);
397
398    let table_id_to_watermark_serde =
399        HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
400
401    gen_test_sstable_impl::<_, Xor16FilterBuilder>(
402        opts,
403        object_id,
404        kv_iter,
405        sstable_store.clone(),
406        CachePolicy::Fill(CacheHint::Normal),
407        table_id_to_vnode,
408        table_id_to_watermark_serde,
409    )
410    .await
411}
412
413/// Generates a user key with table id 0 and the given `table_key`
414pub fn test_user_key(table_key: impl AsRef<[u8]>) -> UserKey<Vec<u8>> {
415    UserKey::for_test(TableId::default(), table_key.as_ref().to_vec())
416}
417
418/// Generates a user key with table id 0 and table key format of `key_test_{idx * 2}`
419pub fn test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
420    let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
421    table_key.extend_from_slice(format!("key_test_{:05}", idx * 2).as_bytes());
422    UserKey::for_test(TableId::default(), table_key)
423}
424
425/// Generates a full key with table id 0 and epoch 123. User key is created with `test_user_key_of`.
426pub fn test_key_of(idx: usize) -> FullKey<Vec<u8>> {
427    FullKey {
428        user_key: test_user_key_of(idx),
429        epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
430    }
431}
432
433/// The value of an index in the test table
434pub fn test_value_of(idx: usize) -> Vec<u8> {
435    "23332333"
436        .as_bytes()
437        .iter()
438        .cycle()
439        .cloned()
440        .take(idx % 100 + 1) // so that the table is not too big
441        .collect_vec()
442}
443
444/// Generates a test table used in almost all table-related tests. Developers may verify the
445/// correctness of their implementations by comparing the got value and the expected value
446/// generated by `test_key_of` and `test_value_of`.
447pub async fn gen_default_test_sstable(
448    opts: SstableBuilderOptions,
449    object_id: HummockSstableObjectId,
450    sstable_store: SstableStoreRef,
451) -> (TableHolder, SstableInfo) {
452    gen_test_sstable(
453        opts,
454        object_id,
455        (0..TEST_KEYS_COUNT).map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
456        sstable_store,
457    )
458    .await
459}
460
461pub async fn count_stream(mut i: impl StateStoreIter) -> usize {
462    let mut c: usize = 0;
463    while i.try_next().await.unwrap().is_some() {
464        c += 1
465    }
466    c
467}
468
469pub fn create_small_table_cache() -> Arc<LruCache<HummockSstableObjectId, Box<Sstable>>> {
470    Arc::new(LruCache::new(1, 4, 0))
471}
472
473pub async fn hybrid_cache_for_test<K, V>() -> HybridCache<K, V>
474where
475    K: HybridKey,
476    V: HybridValue,
477{
478    HybridCacheBuilder::new()
479        .memory(10)
480        .storage(Engine::Large)
481        .build()
482        .await
483        .unwrap()
484}
485
486pub trait StateStoreReadTestExt: StateStore {
487    /// Point gets a value from the state store.
488    /// The result is based on a snapshot corresponding to the given `epoch`.
489    /// Both full key and the value are returned.
490    fn get_keyed_row(
491        &self,
492        key: TableKey<Bytes>,
493        epoch: u64,
494        read_options: ReadOptions,
495    ) -> impl StorageFuture<'_, Option<StateStoreKeyedRow>>;
496
497    /// Point gets a value from the state store.
498    /// The result is based on a snapshot corresponding to the given `epoch`.
499    /// Only the value is returned.
500    fn get(
501        &self,
502        key: TableKey<Bytes>,
503        epoch: u64,
504        read_options: ReadOptions,
505    ) -> impl StorageFuture<'_, Option<Bytes>> {
506        self.get_keyed_row(key, epoch, read_options)
507            .map_ok(|v| v.map(|(_, v)| v))
508    }
509
510    /// Opens and returns an iterator for given `prefix_hint` and `full_key_range`
511    /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and
512    /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included
513    /// in `key_range`) The returned iterator will iterate data based on a snapshot
514    /// corresponding to the given `epoch`.
515    fn iter(
516        &self,
517        key_range: TableKeyRange,
518        epoch: u64,
519        read_options: ReadOptions,
520    ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter>;
521
522    fn rev_iter(
523        &self,
524        key_range: TableKeyRange,
525        epoch: u64,
526        read_options: ReadOptions,
527    ) -> impl StorageFuture<'_, <<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter>;
528
529    fn scan(
530        &self,
531        key_range: TableKeyRange,
532        epoch: u64,
533        limit: Option<usize>,
534        read_options: ReadOptions,
535    ) -> impl StorageFuture<'_, Vec<StateStoreKeyedRow>>;
536}
537
538impl<S: StateStore> StateStoreReadTestExt for S {
539    async fn get_keyed_row(
540        &self,
541        key: TableKey<Bytes>,
542        epoch: u64,
543        read_options: ReadOptions,
544    ) -> StorageResult<Option<StateStoreKeyedRow>> {
545        let snapshot = self
546            .new_read_snapshot(
547                HummockReadEpoch::NoWait(epoch),
548                NewReadSnapshotOptions {
549                    table_id: read_options.table_id,
550                },
551            )
552            .await?;
553        snapshot.get_keyed_row(key, read_options).await
554    }
555
556    async fn iter(
557        &self,
558        key_range: TableKeyRange,
559        epoch: u64,
560        read_options: ReadOptions,
561    ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::Iter> {
562        let snapshot = self
563            .new_read_snapshot(
564                HummockReadEpoch::NoWait(epoch),
565                NewReadSnapshotOptions {
566                    table_id: read_options.table_id,
567                },
568            )
569            .await?;
570        snapshot.iter(key_range, read_options).await
571    }
572
573    async fn rev_iter(
574        &self,
575        key_range: TableKeyRange,
576        epoch: u64,
577        read_options: ReadOptions,
578    ) -> StorageResult<<<Self as StateStore>::ReadSnapshot as StateStoreRead>::RevIter> {
579        let snapshot = self
580            .new_read_snapshot(
581                HummockReadEpoch::NoWait(epoch),
582                NewReadSnapshotOptions {
583                    table_id: read_options.table_id,
584                },
585            )
586            .await?;
587        snapshot.rev_iter(key_range, read_options).await
588    }
589
590    async fn scan(
591        &self,
592        key_range: TableKeyRange,
593        epoch: u64,
594        limit: Option<usize>,
595        mut read_options: ReadOptions,
596    ) -> StorageResult<Vec<StateStoreKeyedRow>> {
597        if limit.is_some() {
598            read_options.prefetch_options.prefetch = false;
599        }
600        const MAX_INITIAL_CAP: usize = 1024;
601        let limit = limit.unwrap_or(usize::MAX);
602        let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
603        let mut iter = self.iter(key_range, epoch, read_options).await?;
604        while let Some((key, value)) = iter.try_next().await? {
605            ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
606        }
607        Ok(ret)
608    }
609}