risingwave_storage/hummock/iterator/
test_utils.rs1use std::sync::Arc;
16
17use bytes::Bytes;
18use foyer::{Engine, HybridCacheBuilder};
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::util::epoch::test_epoch;
24use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, prefix_slice_with_vnode};
25use risingwave_hummock_sdk::sstable_info::SstableInfo;
26use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId};
27use risingwave_object_store::object::{
28 InMemObjectStore, ObjectStore, ObjectStoreImpl, ObjectStoreRef,
29};
30
31use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
32use crate::hummock::sstable::SstableIteratorReadOptions;
33use crate::hummock::sstable_store::SstableStore;
34pub use crate::hummock::test_utils::default_builder_opt_for_test;
35use crate::hummock::test_utils::{
36 gen_test_sstable, gen_test_sstable_info, gen_test_sstable_with_range_tombstone,
37};
38use crate::hummock::{
39 HummockValue, SstableBuilderOptions, SstableIterator, SstableIteratorType, SstableStoreConfig,
40 SstableStoreRef, TableHolder,
41};
42use crate::monitor::{ObjectStoreMetrics, global_hummock_state_store_metrics};
43
44#[macro_export]
46macro_rules! assert_bytes_eq {
47 ($left:expr, $right:expr) => {{
48 use bytes::Bytes;
49 assert_eq!(
50 Bytes::copy_from_slice(&$left),
51 Bytes::copy_from_slice(&$right)
52 )
53 }};
54}
55
56pub const TEST_KEYS_COUNT: usize = 10;
57
58pub async fn mock_sstable_store() -> SstableStoreRef {
59 mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
60 InMemObjectStore::new().monitored(
61 Arc::new(ObjectStoreMetrics::unused()),
62 Arc::new(ObjectStoreConfig::default()),
63 ),
64 )))
65 .await
66}
67
68pub async fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef {
69 let path = "test".to_owned();
70 let meta_cache = HybridCacheBuilder::new()
71 .memory(64 << 20)
72 .with_shards(2)
73 .storage(Engine::Large)
74 .build()
75 .await
76 .unwrap();
77 let block_cache = HybridCacheBuilder::new()
78 .memory(64 << 20)
79 .with_shards(2)
80 .storage(Engine::Large)
81 .build()
82 .await
83 .unwrap();
84 Arc::new(SstableStore::new(SstableStoreConfig {
85 store,
86 path,
87
88 prefetch_buffer_capacity: 64 << 20,
89 max_prefetch_block_number: 16,
90
91 recent_filter: None,
92 state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
93 use_new_object_prefix_strategy: true,
94
95 meta_cache,
96 block_cache,
97 }))
98}
99
100pub fn iterator_test_table_key_of(idx: usize) -> Vec<u8> {
102 prefix_slice_with_vnode(VirtualNode::ZERO, format!("key_test_{:05}", idx).as_bytes()).to_vec()
103}
104
105pub fn iterator_test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
106 UserKey::for_test(TableId::default(), iterator_test_table_key_of(idx))
107}
108
109pub fn iterator_test_bytes_user_key_of(idx: usize) -> UserKey<Bytes> {
110 UserKey::for_test(
111 TableId::default(),
112 Bytes::from(iterator_test_table_key_of(idx)),
113 )
114}
115
116pub fn iterator_test_key_of(idx: usize) -> FullKey<Vec<u8>> {
118 FullKey {
119 user_key: iterator_test_user_key_of(idx),
120 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(233)),
121 }
122}
123
124pub fn iterator_test_bytes_key_of(idx: usize) -> FullKey<Bytes> {
126 iterator_test_key_of(idx).into_bytes()
127}
128
129pub fn iterator_test_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Vec<u8>> {
131 FullKey {
132 user_key: iterator_test_user_key_of(idx),
133 epoch_with_gap: EpochWithGap::new_from_epoch(epoch),
134 }
135}
136
137pub fn iterator_test_bytes_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Bytes> {
139 iterator_test_key_of_epoch(idx, test_epoch(epoch)).into_bytes()
140}
141
142pub fn iterator_test_value_of(idx: usize) -> Vec<u8> {
144 format!("value_test_{:05}", idx).as_bytes().to_vec()
145}
146
147pub fn transform_shared_buffer(
148 batches: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
149) -> Vec<(TableKey<Bytes>, SharedBufferValue<Bytes>)> {
150 batches
151 .into_iter()
152 .map(|(k, v)| (TableKey(k.into()), v))
153 .collect_vec()
154}
155
156pub async fn gen_iterator_test_sstable_info(
160 object_id: HummockSstableObjectId,
161 opts: SstableBuilderOptions,
162 idx_mapping: impl Fn(usize) -> usize,
163 sstable_store: SstableStoreRef,
164 total: usize,
165) -> SstableInfo {
166 gen_test_sstable_info(
167 opts,
168 object_id,
169 (0..total).map(|i| {
170 (
171 iterator_test_key_of(idx_mapping(i)),
172 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
173 )
174 }),
175 sstable_store,
176 )
177 .await
178}
179
180pub async fn gen_iterator_test_sstable_base(
184 object_id: HummockSstableObjectId,
185 opts: SstableBuilderOptions,
186 idx_mapping: impl Fn(usize) -> usize,
187 sstable_store: SstableStoreRef,
188 total: usize,
189) -> (TableHolder, SstableInfo) {
190 gen_test_sstable(
191 opts,
192 object_id,
193 (0..total).map(|i| {
194 (
195 iterator_test_key_of(idx_mapping(i)),
196 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
197 )
198 }),
199 sstable_store,
200 )
201 .await
202}
203
204pub async fn gen_iterator_test_sstable_from_kv_pair(
206 object_id: HummockSstableObjectId,
207 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
208 sstable_store: SstableStoreRef,
209) -> (TableHolder, SstableInfo) {
210 gen_test_sstable(
211 default_builder_opt_for_test(),
212 object_id,
213 kv_pairs
214 .into_iter()
215 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
216 sstable_store,
217 )
218 .await
219}
220
221pub async fn gen_iterator_test_sstable_with_range_tombstones(
223 object_id: HummockSstableObjectId,
224 kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
225 sstable_store: SstableStoreRef,
226) -> SstableInfo {
227 gen_test_sstable_with_range_tombstone(
228 default_builder_opt_for_test(),
229 object_id,
230 kv_pairs
231 .into_iter()
232 .map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
233 sstable_store,
234 )
235 .await
236}
237
238pub async fn gen_merge_iterator_interleave_test_sstable_iters(
239 key_count: usize,
240 count: usize,
241) -> Vec<SstableIterator> {
242 let sstable_store = mock_sstable_store().await;
243 let mut result = vec![];
244 for i in 0..count {
245 let (table, sstable_info) = gen_iterator_test_sstable_base(
246 i as HummockSstableObjectId,
247 default_builder_opt_for_test(),
248 |x| x * count + i,
249 sstable_store.clone(),
250 key_count,
251 )
252 .await;
253 result.push(SstableIterator::create(
254 table,
255 sstable_store.clone(),
256 Arc::new(SstableIteratorReadOptions::default()),
257 &sstable_info,
258 ));
259 }
260 result
261}
262
263pub async fn gen_iterator_test_sstable_with_incr_epoch(
264 object_id: HummockSstableObjectId,
265 opts: SstableBuilderOptions,
266 idx_mapping: impl Fn(usize) -> usize,
267 sstable_store: SstableStoreRef,
268 total: usize,
269 epoch_base: u64,
270) -> (TableHolder, SstableInfo) {
271 gen_test_sstable(
272 opts,
273 object_id,
274 (0..total).map(|i| {
275 (
276 iterator_test_key_of_epoch(idx_mapping(i), test_epoch(epoch_base + i as u64)),
277 HummockValue::put(iterator_test_value_of(idx_mapping(i))),
278 )
279 }),
280 sstable_store,
281 )
282 .await
283}