risingwave_storage/hummock/iterator/
test_utils.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use foyer::{CacheBuilder, Engine, HybridCacheBuilder, LargeEngineOptions};
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::epoch::test_epoch;
24use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, prefix_slice_with_vnode};
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
27use risingwave_object_store::object::{
28 InMemObjectStore, ObjectStore, ObjectStoreImpl, ObjectStoreRef,
29};
30
31use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
32use crate::hummock::sstable::SstableIteratorReadOptions;
33use crate::hummock::sstable_store::SstableStore;
34pub use crate::hummock::test_utils::default_builder_opt_for_test;
35use crate::hummock::test_utils::{
36 gen_test_sstable, gen_test_sstable_info, gen_test_sstable_with_range_tombstone,
37};
38use crate::hummock::{
39 HummockValue, SstableBuilderOptions, SstableIterator, SstableIteratorType, SstableStoreConfig,
40 SstableStoreRef, TableHolder,
41};
42use crate::monitor::{ObjectStoreMetrics, global_hummock_state_store_metrics};
43
44#[macro_export]
46macro_rules! assert_bytes_eq {
47 ($left:expr, $right:expr) => {{
48 use bytes::Bytes;
49 assert_eq!(
50 Bytes::copy_from_slice(&$left),
51 Bytes::copy_from_slice(&$right)
52 )
53 }};
54}
55
56pub const TEST_KEYS_COUNT: usize = 10;
57
58pub async fn mock_sstable_store() -> SstableStoreRef {
59 mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
60 InMemObjectStore::for_test().monitored(
61 Arc::new(ObjectStoreMetrics::unused()),
62 Arc::new(ObjectStoreConfig::default()),
63 ),
64 )))
65 .await
66}
67
68pub async fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef {
69 let path = "test".to_owned();
70 let meta_cache = HybridCacheBuilder::new()
71 .memory(64 << 20)
72 .with_shards(2)
73 .storage(Engine::Large(LargeEngineOptions::new()))
74 .build()
75 .await
76 .unwrap();
77 let block_cache = HybridCacheBuilder::new()
78 .memory(64 << 20)
79 .with_shards(2)
80 .storage(Engine::Large(LargeEngineOptions::new()))
81 .build()
82 .await
83 .unwrap();
84 Arc::new(SstableStore::new(SstableStoreConfig {
85 store,
86 path,
87
88 prefetch_buffer_capacity: 64 << 20,
89 max_prefetch_block_number: 16,
90
91 recent_filter: None,
92 state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
93 use_new_object_prefix_strategy: true,
94
95 meta_cache,
96 block_cache,
97 vector_meta_cache: CacheBuilder::new(64 << 20).build(),
98 vector_block_cache: CacheBuilder::new(64 << 20).build(),
99 }))
100}
101
102pub fn iterator_test_table_key_of(idx: usize) -> Vec<u8> {
104 prefix_slice_with_vnode(VirtualNode::ZERO, format!("key_test_{:05}", idx).as_bytes()).to_vec()
105}
106
107pub fn iterator_test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
108 UserKey::for_test(TableId::default(), iterator_test_table_key_of(idx))
109}
110
111pub fn iterator_test_bytes_user_key_of(idx: usize) -> UserKey<Bytes> {
112 UserKey::for_test(
113 TableId::default(),
114 Bytes::from(iterator_test_table_key_of(idx)),
115 )
116}
117
118pub fn iterator_test_key_of(idx: usize) -> FullKey<Vec<u8>> {
120 FullKey {
121 user_key: iterator_test_user_key_of(idx),
122 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(233)),
123 }
124}
125
126pub fn iterator_test_bytes_key_of(idx: usize) -> FullKey<Bytes> {
128 iterator_test_key_of(idx).into_bytes()
129}
130
131pub fn iterator_test_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Vec<u8>> {
133 FullKey {
134 user_key: iterator_test_user_key_of(idx),
135 epoch_with_gap: EpochWithGap::new_from_epoch(epoch),
136 }
137}
138
139pub fn iterator_test_bytes_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Bytes> {
141 iterator_test_key_of_epoch(idx, test_epoch(epoch)).into_bytes()
142}
143
144pub fn iterator_test_value_of(idx: usize) -> Vec<u8> {
146 format!("value_test_{:05}", idx).as_bytes().to_vec()
147}
148
149pub fn transform_shared_buffer(
150 batches: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
151) -> Vec<(TableKey<Bytes>, SharedBufferValue<Bytes>)> {
152 batches
153 .into_iter()
154 .map(|(k, v)| (TableKey(k.into()), v))
155 .collect_vec()
156}
157
158pub async fn gen_iterator_test_sstable_info(
162 object_id: u64,
163 opts: SstableBuilderOptions,
164 idx_mapping: impl Fn(usize) -> usize,
165 sstable_store: SstableStoreRef,
166 total: usize,
167) -> SstableInfo {
168 gen_test_sstable_info(
169 opts,
170 object_id,
171 (0..total).map(|i| {
172 (
173 iterator_test_key_of(idx_mapping(i)),
174 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
175 )
176 }),
177 sstable_store,
178 )
179 .await
180}
181
182pub async fn gen_iterator_test_sstable_base(
186 object_id: u64,
187 opts: SstableBuilderOptions,
188 idx_mapping: impl Fn(usize) -> usize,
189 sstable_store: SstableStoreRef,
190 total: usize,
191) -> (TableHolder, SstableInfo) {
192 gen_test_sstable(
193 opts,
194 object_id,
195 (0..total).map(|i| {
196 (
197 iterator_test_key_of(idx_mapping(i)),
198 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
199 )
200 }),
201 sstable_store,
202 )
203 .await
204}
205
206pub async fn gen_iterator_test_sstable_from_kv_pair(
208 object_id: u64,
209 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
210 sstable_store: SstableStoreRef,
211) -> (TableHolder, SstableInfo) {
212 gen_test_sstable(
213 default_builder_opt_for_test(),
214 object_id,
215 kv_pairs
216 .into_iter()
217 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
218 sstable_store,
219 )
220 .await
221}
222
223pub async fn gen_iterator_test_sstable_with_range_tombstones(
225 object_id: u64,
226 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
227 sstable_store: SstableStoreRef,
228) -> SstableInfo {
229 gen_test_sstable_with_range_tombstone(
230 default_builder_opt_for_test(),
231 object_id,
232 kv_pairs
233 .into_iter()
234 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
235 sstable_store,
236 )
237 .await
238}
239
240pub async fn gen_merge_iterator_interleave_test_sstable_iters(
241 key_count: usize,
242 count: usize,
243) -> Vec<SstableIterator> {
244 let sstable_store = mock_sstable_store().await;
245 let mut result = vec![];
246 for i in 0..count {
247 let (table, sstable_info) = gen_iterator_test_sstable_base(
248 i as u64,
249 default_builder_opt_for_test(),
250 |x| x * count + i,
251 sstable_store.clone(),
252 key_count,
253 )
254 .await;
255 result.push(SstableIterator::create(
256 table,
257 sstable_store.clone(),
258 Arc::new(SstableIteratorReadOptions::default()),
259 &sstable_info,
260 ));
261 }
262 result
263}
264
265pub async fn gen_iterator_test_sstable_with_incr_epoch(
266 object_id: u64,
267 opts: SstableBuilderOptions,
268 idx_mapping: impl Fn(usize) -> usize,
269 sstable_store: SstableStoreRef,
270 total: usize,
271 epoch_base: u64,
272) -> (TableHolder, SstableInfo) {
273 gen_test_sstable(
274 opts,
275 object_id,
276 (0..total).map(|i| {
277 (
278 iterator_test_key_of_epoch(idx_mapping(i), test_epoch(epoch_base + i as u64)),
279 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
280 )
281 }),
282 sstable_store,
283 )
284 .await
285}