risingwave_storage/hummock/iterator/
test_utils.rsuse std::sync::Arc;
use bytes::Bytes;
use foyer::{Engine, HybridCacheBuilder};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::key::{prefix_slice_with_vnode, FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, HummockSstableObjectId};
use risingwave_object_store::object::{
InMemObjectStore, ObjectStore, ObjectStoreImpl, ObjectStoreRef,
};
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferValue;
use crate::hummock::sstable::SstableIteratorReadOptions;
use crate::hummock::sstable_store::SstableStore;
pub use crate::hummock::test_utils::default_builder_opt_for_test;
use crate::hummock::test_utils::{
gen_test_sstable, gen_test_sstable_info, gen_test_sstable_with_range_tombstone,
};
use crate::hummock::{
HummockValue, SstableBuilderOptions, SstableIterator, SstableIteratorType, SstableStoreConfig,
SstableStoreRef, TableHolder,
};
use crate::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics};
#[macro_export]
macro_rules! assert_bytes_eq {
($left:expr, $right:expr) => {{
use bytes::Bytes;
assert_eq!(
Bytes::copy_from_slice(&$left),
Bytes::copy_from_slice(&$right)
)
}};
}
pub const TEST_KEYS_COUNT: usize = 10;
pub async fn mock_sstable_store() -> SstableStoreRef {
mock_sstable_store_with_object_store(Arc::new(ObjectStoreImpl::InMem(
InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
Arc::new(ObjectStoreConfig::default()),
),
)))
.await
}
pub async fn mock_sstable_store_with_object_store(store: ObjectStoreRef) -> SstableStoreRef {
let path = "test".to_string();
let meta_cache = HybridCacheBuilder::new()
.memory(64 << 20)
.with_shards(2)
.storage(Engine::Large)
.build()
.await
.unwrap();
let block_cache = HybridCacheBuilder::new()
.memory(64 << 20)
.with_shards(2)
.storage(Engine::Large)
.build()
.await
.unwrap();
Arc::new(SstableStore::new(SstableStoreConfig {
store,
path,
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
use_new_object_prefix_strategy: true,
meta_cache,
block_cache,
}))
}
pub fn iterator_test_table_key_of(idx: usize) -> Vec<u8> {
prefix_slice_with_vnode(VirtualNode::ZERO, format!("key_test_{:05}", idx).as_bytes()).to_vec()
}
pub fn iterator_test_user_key_of(idx: usize) -> UserKey<Vec<u8>> {
UserKey::for_test(TableId::default(), iterator_test_table_key_of(idx))
}
pub fn iterator_test_bytes_user_key_of(idx: usize) -> UserKey<Bytes> {
UserKey::for_test(
TableId::default(),
Bytes::from(iterator_test_table_key_of(idx)),
)
}
pub fn iterator_test_key_of(idx: usize) -> FullKey<Vec<u8>> {
FullKey {
user_key: iterator_test_user_key_of(idx),
epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(233)),
}
}
pub fn iterator_test_bytes_key_of(idx: usize) -> FullKey<Bytes> {
iterator_test_key_of(idx).into_bytes()
}
pub fn iterator_test_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Vec<u8>> {
FullKey {
user_key: iterator_test_user_key_of(idx),
epoch_with_gap: EpochWithGap::new_from_epoch(epoch),
}
}
pub fn iterator_test_bytes_key_of_epoch(idx: usize, epoch: HummockEpoch) -> FullKey<Bytes> {
iterator_test_key_of_epoch(idx, test_epoch(epoch)).into_bytes()
}
pub fn iterator_test_value_of(idx: usize) -> Vec<u8> {
format!("value_test_{:05}", idx).as_bytes().to_vec()
}
pub fn transform_shared_buffer(
batches: Vec<(Vec<u8>, SharedBufferValue<Bytes>)>,
) -> Vec<(TableKey<Bytes>, SharedBufferValue<Bytes>)> {
batches
.into_iter()
.map(|(k, v)| (TableKey(k.into()), v))
.collect_vec()
}
pub async fn gen_iterator_test_sstable_info(
object_id: HummockSstableObjectId,
opts: SstableBuilderOptions,
idx_mapping: impl Fn(usize) -> usize,
sstable_store: SstableStoreRef,
total: usize,
) -> SstableInfo {
gen_test_sstable_info(
opts,
object_id,
(0..total).map(|i| {
(
iterator_test_key_of(idx_mapping(i)),
HummockValue::put(iterator_test_value_of(idx_mapping(i))),
)
}),
sstable_store,
)
.await
}
pub async fn gen_iterator_test_sstable_base(
object_id: HummockSstableObjectId,
opts: SstableBuilderOptions,
idx_mapping: impl Fn(usize) -> usize,
sstable_store: SstableStoreRef,
total: usize,
) -> TableHolder {
gen_test_sstable(
opts,
object_id,
(0..total).map(|i| {
(
iterator_test_key_of(idx_mapping(i)),
HummockValue::put(iterator_test_value_of(idx_mapping(i))),
)
}),
sstable_store,
)
.await
}
pub async fn gen_iterator_test_sstable_from_kv_pair(
object_id: HummockSstableObjectId,
kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
sstable_store: SstableStoreRef,
) -> TableHolder {
gen_test_sstable(
default_builder_opt_for_test(),
object_id,
kv_pairs
.into_iter()
.map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
sstable_store,
)
.await
}
pub async fn gen_iterator_test_sstable_with_range_tombstones(
object_id: HummockSstableObjectId,
kv_pairs: Vec<(usize, u64, HummockValue<Vec<u8>>)>,
sstable_store: SstableStoreRef,
) -> SstableInfo {
gen_test_sstable_with_range_tombstone(
default_builder_opt_for_test(),
object_id,
kv_pairs
.into_iter()
.map(|kv| (iterator_test_key_of_epoch(kv.0, test_epoch(kv.1)), kv.2)),
sstable_store,
)
.await
}
pub async fn gen_merge_iterator_interleave_test_sstable_iters(
key_count: usize,
count: usize,
) -> Vec<SstableIterator> {
let sstable_store = mock_sstable_store().await;
let mut result = vec![];
for i in 0..count {
let table = gen_iterator_test_sstable_base(
i as HummockSstableObjectId,
default_builder_opt_for_test(),
|x| x * count + i,
sstable_store.clone(),
key_count,
)
.await;
result.push(SstableIterator::create(
table,
sstable_store.clone(),
Arc::new(SstableIteratorReadOptions::default()),
));
}
result
}
pub async fn gen_iterator_test_sstable_with_incr_epoch(
object_id: HummockSstableObjectId,
opts: SstableBuilderOptions,
idx_mapping: impl Fn(usize) -> usize,
sstable_store: SstableStoreRef,
total: usize,
epoch_base: u64,
) -> TableHolder {
gen_test_sstable(
opts,
object_id,
(0..total).map(|i| {
(
iterator_test_key_of_epoch(idx_mapping(i), test_epoch(epoch_base + i as u64)),
HummockValue::put(iterator_test_value_of(idx_mapping(i))),
)
}),
sstable_store,
)
.await
}