risingwave_storage/hummock/iterator/
test_utils.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use foyer::{CacheBuilder, HybridCacheBuilder};
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::epoch::test_epoch;
24use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, prefix_slice_with_vnode};
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};
27use risingwave_object_store::object::{
28 InMemObjectStore, ObjectStore, ObjectStoreImpl, ObjectStoreRef,
29};
30
31use crate::hummock::none::NoneRecentFilter;
32use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
33use crate::hummock::sstable::SstableIteratorReadOptions;
34use crate::hummock::sstable_store::SstableStore;
35pub use crate::hummock::test_utils::default_builder_opt_for_test;
36use crate::hummock::test_utils::{
37 gen_test_sstable, gen_test_sstable_info, gen_test_sstable_with_range_tombstone,
38};
39use crate::hummock::{
40 HummockValue, SstableBuilderOptions, SstableIterator, SstableIteratorType, SstableStoreConfig,
41 SstableStoreRef, TableHolder,
42};
43use crate::monitor::{ObjectStoreMetrics, global_hummock_state_store_metrics};
44
45#[macro_export]
47macro_rules! assert_bytes_eq {
48 ($left:expr, $right:expr) => {{
49 use bytes::Bytes;
50 assert_eq!(
51 Bytes::copy_from_slice(&$left),
52 Bytes::copy_from_slice(&$right)
53 )
54 }};
55}
56
57pub const TEST_KEYS_COUNT: usize = 10;
58
59pub async fn mock_sstable_store() -> SstableStoreRef {
60 mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
61 InMemObjectStore::for_test().monitored(
62 Arc::new(ObjectStoreMetrics::unused()),
63 Arc::new(ObjectStoreConfig::default()),
64 ),
65 )))
66 .await
67}
68
69pub async fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef {
70 let path = "test".to_owned();
71 let meta_cache = HybridCacheBuilder::new()
72 .memory(64 << 20)
73 .with_shards(2)
74 .storage()
75 .build()
76 .await
77 .unwrap();
78 let block_cache = HybridCacheBuilder::new()
79 .memory(64 << 20)
80 .with_shards(2)
81 .storage()
82 .build()
83 .await
84 .unwrap();
85 Arc::new(SstableStore::new(SstableStoreConfig {
86 store,
87 path,
88
89 prefetch_buffer_capacity: 64 << 20,
90 max_prefetch_block_number: 16,
91
92 recent_filter: Arc::new(NoneRecentFilter::default().into()),
93 state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
94 use_new_object_prefix_strategy: true,
95 skip_bloom_filter_in_serde: false,
96
97 meta_cache,
98 block_cache,
99 vector_meta_cache: CacheBuilder::new(64 << 20).build(),
100 vector_block_cache: CacheBuilder::new(64 << 20).build(),
101 }))
102}
103
104pub fn iterator_test_table_key_of(idx: usize) -> Vec<u8> {
106 prefix_slice_with_vnode(VirtualNode::ZERO, format!("key_test_{:05}", idx).as_bytes()).to_vec()
107}
108
109pub fn iterator_test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
110 UserKey::for_test(TableId::default(), iterator_test_table_key_of(idx))
111}
112
113pub fn iterator_test_bytes_user_key_of(idx: usize) -> UserKey<Bytes> {
114 UserKey::for_test(
115 TableId::default(),
116 Bytes::from(iterator_test_table_key_of(idx)),
117 )
118}
119
120pub fn iterator_test_key_of(idx: usize) -> FullKey<Vec<u8>> {
122 FullKey {
123 user_key: iterator_test_user_key_of(idx),
124 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(233)),
125 }
126}
127
128pub fn iterator_test_bytes_key_of(idx: usize) -> FullKey<Bytes> {
130 iterator_test_key_of(idx).into_bytes()
131}
132
133pub fn iterator_test_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Vec<u8>> {
135 FullKey {
136 user_key: iterator_test_user_key_of(idx),
137 epoch_with_gap: EpochWithGap::new_from_epoch(epoch),
138 }
139}
140
141pub fn iterator_test_bytes_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Bytes> {
143 iterator_test_key_of_epoch(idx, test_epoch(epoch)).into_bytes()
144}
145
146pub fn iterator_test_value_of(idx: usize) -> Vec<u8> {
148 format!("value_test_{:05}", idx).as_bytes().to_vec()
149}
150
151pub fn transform_shared_buffer(
152 batches: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
153) -> Vec<(TableKey<Bytes>, SharedBufferValue<Bytes>)> {
154 batches
155 .into_iter()
156 .map(|(k, v)| (TableKey(k.into()), v))
157 .collect_vec()
158}
159
160pub async fn gen_iterator_test_sstable_info(
164 object_id: u64,
165 opts: SstableBuilderOptions,
166 idx_mapping: impl Fn(usize) -> usize,
167 sstable_store: SstableStoreRef,
168 total: usize,
169) -> SstableInfo {
170 gen_test_sstable_info(
171 opts,
172 object_id,
173 (0..total).map(|i| {
174 (
175 iterator_test_key_of(idx_mapping(i)),
176 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
177 )
178 }),
179 sstable_store,
180 )
181 .await
182}
183
184pub async fn gen_iterator_test_sstable_base(
188 object_id: u64,
189 opts: SstableBuilderOptions,
190 idx_mapping: impl Fn(usize) -> usize,
191 sstable_store: SstableStoreRef,
192 total: usize,
193) -> (TableHolder, SstableInfo) {
194 gen_test_sstable(
195 opts,
196 object_id,
197 (0..total).map(|i| {
198 (
199 iterator_test_key_of(idx_mapping(i)),
200 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
201 )
202 }),
203 sstable_store,
204 )
205 .await
206}
207
208pub async fn gen_iterator_test_sstable_from_kv_pair(
210 object_id: u64,
211 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
212 sstable_store: SstableStoreRef,
213) -> (TableHolder, SstableInfo) {
214 gen_test_sstable(
215 default_builder_opt_for_test(),
216 object_id,
217 kv_pairs
218 .into_iter()
219 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
220 sstable_store,
221 )
222 .await
223}
224
225pub async fn gen_iterator_test_sstable_with_range_tombstones(
227 object_id: u64,
228 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
229 sstable_store: SstableStoreRef,
230) -> SstableInfo {
231 gen_test_sstable_with_range_tombstone(
232 default_builder_opt_for_test(),
233 object_id,
234 kv_pairs
235 .into_iter()
236 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
237 sstable_store,
238 )
239 .await
240}
241
242pub async fn gen_merge_iterator_interleave_test_sstable_iters(
243 key_count: usize,
244 count: usize,
245) -> Vec<SstableIterator> {
246 let sstable_store = mock_sstable_store().await;
247 let mut result = vec![];
248 for i in 0..count {
249 let (table, sstable_info) = gen_iterator_test_sstable_base(
250 i as u64,
251 default_builder_opt_for_test(),
252 |x| x * count + i,
253 sstable_store.clone(),
254 key_count,
255 )
256 .await;
257 result.push(SstableIterator::create(
258 table,
259 sstable_store.clone(),
260 Arc::new(SstableIteratorReadOptions::default()),
261 &sstable_info,
262 ));
263 }
264 result
265}
266
267pub async fn gen_iterator_test_sstable_with_incr_epoch(
268 object_id: u64,
269 opts: SstableBuilderOptions,
270 idx_mapping: impl Fn(usize) -> usize,
271 sstable_store: SstableStoreRef,
272 total: usize,
273 epoch_base: u64,
274) -> (TableHolder, SstableInfo) {
275 gen_test_sstable(
276 opts,
277 object_id,
278 (0..total).map(|i| {
279 (
280 iterator_test_key_of_epoch(idx_mapping(i), test_epoch(epoch_base + i as u64)),
281 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
282 )
283 }),
284 sstable_store,
285 )
286 .await
287}