risingwave_storage/hummock/iterator/
forward_concat.rsuse crate::hummock::iterator::concat_inner::ConcatIteratorInner;
use crate::hummock::SstableIterator;
pub type ConcatIterator = ConcatIteratorInner<SstableIterator>;
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::hummock::iterator::test_utils::{
default_builder_opt_for_test, gen_iterator_test_sstable_info, iterator_test_key_of,
iterator_test_value_of, mock_sstable_store, TEST_KEYS_COUNT,
};
use crate::hummock::iterator::HummockIterator;
use crate::hummock::sstable::SstableIteratorReadOptions;
#[tokio::test]
async fn test_concat_iterator() {
let sstable_store = mock_sstable_store().await;
let table0 = gen_iterator_test_sstable_info(
0,
default_builder_opt_for_test(),
|x| x,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table1 = gen_iterator_test_sstable_info(
1,
default_builder_opt_for_test(),
|x| TEST_KEYS_COUNT + x,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table2 = gen_iterator_test_sstable_info(
2,
default_builder_opt_for_test(),
|x| TEST_KEYS_COUNT * 2 + x,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let mut iter = ConcatIterator::new(
vec![table0, table1, table2],
sstable_store,
Arc::new(SstableIteratorReadOptions::default()),
);
let mut i = 0;
iter.rewind().await.unwrap();
while iter.is_valid() {
let key = iter.key();
let val = iter.value();
assert_eq!(key, iterator_test_key_of(i).to_ref());
assert_eq!(
val.into_user_value().unwrap(),
iterator_test_value_of(i).as_slice()
);
i += 1;
iter.next().await.unwrap();
if i == TEST_KEYS_COUNT * 3 {
assert!(!iter.is_valid());
break;
}
}
iter.rewind().await.unwrap();
let key = iter.key();
let val = iter.value();
assert_eq!(key, iterator_test_key_of(0).to_ref());
assert_eq!(
val.into_user_value().unwrap(),
iterator_test_value_of(0).as_slice()
);
}
#[tokio::test]
async fn test_concat_seek() {
let sstable_store = mock_sstable_store().await;
let table0 = gen_iterator_test_sstable_info(
0,
default_builder_opt_for_test(),
|x| x,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table1 = gen_iterator_test_sstable_info(
1,
default_builder_opt_for_test(),
|x| TEST_KEYS_COUNT + x,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table2 = gen_iterator_test_sstable_info(
2,
default_builder_opt_for_test(),
|x| TEST_KEYS_COUNT * 2 + x,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let mut iter = ConcatIterator::new(
vec![table0, table1, table2],
sstable_store,
Arc::new(SstableIteratorReadOptions::default()),
);
iter.seek(iterator_test_key_of(TEST_KEYS_COUNT + 1).to_ref())
.await
.unwrap();
let key = iter.key();
let val = iter.value();
assert_eq!(key, iterator_test_key_of(TEST_KEYS_COUNT + 1).to_ref());
assert_eq!(
val.into_user_value().unwrap(),
iterator_test_value_of(TEST_KEYS_COUNT + 1).as_slice()
);
iter.seek(iterator_test_key_of(0).to_ref()).await.unwrap();
let key = iter.key();
let val = iter.value();
assert_eq!(key, iterator_test_key_of(0).to_ref());
assert_eq!(
val.into_user_value().unwrap(),
iterator_test_value_of(0).as_slice()
);
iter.seek(iterator_test_key_of(3 * TEST_KEYS_COUNT - 1).to_ref())
.await
.unwrap();
let key = iter.key();
let val = iter.value();
assert_eq!(key, iterator_test_key_of(3 * TEST_KEYS_COUNT - 1).to_ref());
assert_eq!(
val.into_user_value().unwrap(),
iterator_test_value_of(3 * TEST_KEYS_COUNT - 1).as_slice()
);
iter.seek(iterator_test_key_of(3 * TEST_KEYS_COUNT).to_ref())
.await
.unwrap();
assert!(!iter.is_valid());
}
#[tokio::test]
async fn test_concat_seek_not_exists() {
let sstable_store = mock_sstable_store().await;
let table0 = gen_iterator_test_sstable_info(
0,
default_builder_opt_for_test(),
|x| x * 2,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table1 = gen_iterator_test_sstable_info(
1,
default_builder_opt_for_test(),
|x| (TEST_KEYS_COUNT + x) * 2,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let table2 = gen_iterator_test_sstable_info(
2,
default_builder_opt_for_test(),
|x| (2 * TEST_KEYS_COUNT + x) * 2,
sstable_store.clone(),
TEST_KEYS_COUNT,
)
.await;
let mut iter = ConcatIterator::new(
vec![table0, table1, table2],
sstable_store,
Arc::new(SstableIteratorReadOptions::default()),
);
iter.seek(iterator_test_key_of(TEST_KEYS_COUNT + 1).to_ref())
.await
.unwrap();
let key = iter.key();
let val = iter.value();
assert_eq!(key, iterator_test_key_of(TEST_KEYS_COUNT + 2).to_ref());
assert_eq!(
val.into_user_value().unwrap(),
iterator_test_value_of(TEST_KEYS_COUNT + 2).as_slice()
);
iter.seek(iterator_test_key_of((TEST_KEYS_COUNT + 9) * 2 + 1).to_ref())
.await
.unwrap();
let key = iter.key();
let val = iter.value();
assert_eq!(key, iterator_test_key_of(TEST_KEYS_COUNT * 4).to_ref());
assert_eq!(
val.into_user_value().unwrap(),
iterator_test_value_of(TEST_KEYS_COUNT * 4).as_slice()
);
}
}