risingwave_storage/hummock/iterator/
test_utils.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use foyer::{CacheBuilder, HybridCacheBuilder};
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::epoch::test_epoch;
24use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, prefix_slice_with_vnode};
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
27use risingwave_object_store::object::{
28 InMemObjectStore, ObjectStore, ObjectStoreImpl, ObjectStoreRef,
29};
30
31use crate::hummock::none::NoneRecentFilter;
32use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
33use crate::hummock::sstable::SstableIteratorReadOptions;
34use crate::hummock::sstable_store::SstableStore;
35pub use crate::hummock::test_utils::default_builder_opt_for_test;
36use crate::hummock::test_utils::{
37 gen_test_sstable, gen_test_sstable_info, gen_test_sstable_with_range_tombstone,
38};
39use crate::hummock::{
40 HummockValue, SstableBuilderOptions, SstableIterator, SstableIteratorType, SstableStoreConfig,
41 SstableStoreRef, TableHolder,
42};
43use crate::monitor::{ObjectStoreMetrics, global_hummock_state_store_metrics};
44
45#[macro_export]
47macro_rules! assert_bytes_eq {
48 ($left:expr, $right:expr) => {{
49 use bytes::Bytes;
50 assert_eq!(
51 Bytes::copy_from_slice(&$left),
52 Bytes::copy_from_slice(&$right)
53 )
54 }};
55}
56
57pub const TEST_KEYS_COUNT: usize = 10;
58
59pub async fn mock_sstable_store() -> SstableStoreRef {
60 mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
61 InMemObjectStore::for_test().monitored(
62 Arc::new(ObjectStoreMetrics::unused()),
63 Arc::new(ObjectStoreConfig::default()),
64 ),
65 )))
66 .await
67}
68
69pub async fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef {
70 let path = "test".to_owned();
71 let meta_cache = HybridCacheBuilder::new()
72 .memory(64 << 20)
73 .with_shards(2)
74 .storage()
75 .build()
76 .await
77 .unwrap();
78 let block_cache = HybridCacheBuilder::new()
79 .memory(64 << 20)
80 .with_shards(2)
81 .storage()
82 .build()
83 .await
84 .unwrap();
85 Arc::new(SstableStore::new(SstableStoreConfig {
86 store,
87 path,
88
89 prefetch_buffer_capacity: 64 << 20,
90 max_prefetch_block_number: 16,
91
92 recent_filter: Arc::new(NoneRecentFilter::default().into()),
93 state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
94 use_new_object_prefix_strategy: true,
95
96 meta_cache,
97 block_cache,
98 vector_meta_cache: CacheBuilder::new(64 << 20).build(),
99 vector_block_cache: CacheBuilder::new(64 << 20).build(),
100 }))
101}
102
103pub fn iterator_test_table_key_of(idx: usize) -> Vec<u8> {
105 prefix_slice_with_vnode(VirtualNode::ZERO, format!("key_test_{:05}", idx).as_bytes()).to_vec()
106}
107
108pub fn iterator_test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
109 UserKey::for_test(TableId::default(), iterator_test_table_key_of(idx))
110}
111
112pub fn iterator_test_bytes_user_key_of(idx: usize) -> UserKey<Bytes> {
113 UserKey::for_test(
114 TableId::default(),
115 Bytes::from(iterator_test_table_key_of(idx)),
116 )
117}
118
119pub fn iterator_test_key_of(idx: usize) -> FullKey<Vec<u8>> {
121 FullKey {
122 user_key: iterator_test_user_key_of(idx),
123 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(233)),
124 }
125}
126
127pub fn iterator_test_bytes_key_of(idx: usize) -> FullKey<Bytes> {
129 iterator_test_key_of(idx).into_bytes()
130}
131
132pub fn iterator_test_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Vec<u8>> {
134 FullKey {
135 user_key: iterator_test_user_key_of(idx),
136 epoch_with_gap: EpochWithGap::new_from_epoch(epoch),
137 }
138}
139
140pub fn iterator_test_bytes_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Bytes> {
142 iterator_test_key_of_epoch(idx, test_epoch(epoch)).into_bytes()
143}
144
145pub fn iterator_test_value_of(idx: usize) -> Vec<u8> {
147 format!("value_test_{:05}", idx).as_bytes().to_vec()
148}
149
150pub fn transform_shared_buffer(
151 batches: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
152) -> Vec<(TableKey<Bytes>, SharedBufferValue<Bytes>)> {
153 batches
154 .into_iter()
155 .map(|(k, v)| (TableKey(k.into()), v))
156 .collect_vec()
157}
158
159pub async fn gen_iterator_test_sstable_info(
163 object_id: u64,
164 opts: SstableBuilderOptions,
165 idx_mapping: impl Fn(usize) -> usize,
166 sstable_store: SstableStoreRef,
167 total: usize,
168) -> SstableInfo {
169 gen_test_sstable_info(
170 opts,
171 object_id,
172 (0..total).map(|i| {
173 (
174 iterator_test_key_of(idx_mapping(i)),
175 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
176 )
177 }),
178 sstable_store,
179 )
180 .await
181}
182
183pub async fn gen_iterator_test_sstable_base(
187 object_id: u64,
188 opts: SstableBuilderOptions,
189 idx_mapping: impl Fn(usize) -> usize,
190 sstable_store: SstableStoreRef,
191 total: usize,
192) -> (TableHolder, SstableInfo) {
193 gen_test_sstable(
194 opts,
195 object_id,
196 (0..total).map(|i| {
197 (
198 iterator_test_key_of(idx_mapping(i)),
199 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
200 )
201 }),
202 sstable_store,
203 )
204 .await
205}
206
207pub async fn gen_iterator_test_sstable_from_kv_pair(
209 object_id: u64,
210 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
211 sstable_store: SstableStoreRef,
212) -> (TableHolder, SstableInfo) {
213 gen_test_sstable(
214 default_builder_opt_for_test(),
215 object_id,
216 kv_pairs
217 .into_iter()
218 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
219 sstable_store,
220 )
221 .await
222}
223
224pub async fn gen_iterator_test_sstable_with_range_tombstones(
226 object_id: u64,
227 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
228 sstable_store: SstableStoreRef,
229) -> SstableInfo {
230 gen_test_sstable_with_range_tombstone(
231 default_builder_opt_for_test(),
232 object_id,
233 kv_pairs
234 .into_iter()
235 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
236 sstable_store,
237 )
238 .await
239}
240
241pub async fn gen_merge_iterator_interleave_test_sstable_iters(
242 key_count: usize,
243 count: usize,
244) -> Vec<SstableIterator> {
245 let sstable_store = mock_sstable_store().await;
246 let mut result = vec![];
247 for i in 0..count {
248 let (table, sstable_info) = gen_iterator_test_sstable_base(
249 i as u64,
250 default_builder_opt_for_test(),
251 |x| x * count + i,
252 sstable_store.clone(),
253 key_count,
254 )
255 .await;
256 result.push(SstableIterator::create(
257 table,
258 sstable_store.clone(),
259 Arc::new(SstableIteratorReadOptions::default()),
260 &sstable_info,
261 ));
262 }
263 result
264}
265
266pub async fn gen_iterator_test_sstable_with_incr_epoch(
267 object_id: u64,
268 opts: SstableBuilderOptions,
269 idx_mapping: impl Fn(usize) -> usize,
270 sstable_store: SstableStoreRef,
271 total: usize,
272 epoch_base: u64,
273) -> (TableHolder, SstableInfo) {
274 gen_test_sstable(
275 opts,
276 object_id,
277 (0..total).map(|i| {
278 (
279 iterator_test_key_of_epoch(idx_mapping(i), test_epoch(epoch_base + i as u64)),
280 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
281 )
282 }),
283 sstable_store,
284 )
285 .await
286}